Skip to content

Commit

Permalink
feat: #OBS-I321 : Added dataset metric service file
Browse files Browse the repository at this point in the history
  • Loading branch information
yashashkumar committed Jan 15, 2025
1 parent e95ca5f commit cf0370f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,29 @@ import { Request, Response } from "express";
import * as _ from "lodash"
import { ResponseHandler } from "../../helpers/ResponseHandler";
import dayjs from 'dayjs';
import { handleConnectors, handleDataFreshness, handleDataLineage, handleDataObservability, handleDataQuality, handleDataVolume } from "./datasetMetricsHelper";
import logger from "../../logger";
import { schemaValidation } from "../../services/ValidationService";
import validationSchema from "./DatasetMetrics.json";
import { config } from "../../configs/Config";
import { datasetService } from "../../services/DatasetService";
import { getConnectors, getDataFreshness, getDataLineage, getDataObservability, getDataQuality, getDataVolume } from "../../services/DatasetMetricsService";

const apiId = "api.dataset.metrics";
const datasetMetrics = async (req: Request, res: Response) => {
const msgid = _.get(req, "body.params.msgid");
const requestBody = req.body;
const dataset_id = _.get(req, "body.request.dataset_id");
const startDateValue = _.get(req, "body.request.start_date") || config?.data_observability?.default_query_time_period;

const { category, volume_by_days = 10 }: any = req.body.request;
const defaultThreshold = (typeof config?.data_observability?.default_freshness_threshold === 'number' ? config?.data_observability?.default_freshness_threshold : 5) * 60 * 1000; // 5 minutes in milliseconds
const dateFormat = 'YYYY-MM-DDTHH:mm:ss';
const startDate = '2000-01-01T00:00:00+05:30';
const endDate = dayjs().add(1, 'day').format(dateFormat);
const endDate = dayjs().format(dateFormat);
const startDate = dayjs(endDate).subtract(startDateValue, 'day').format(dateFormat);
const intervals = `${startDate}/${endDate}`;
const isValidSchema = schemaValidation(requestBody, validationSchema);
const results = [];

if (!isValidSchema?.isValid) {
logger.error({ apiId, datasetId: dataset_id, msgid, requestBody, message: isValidSchema?.message, code: "DATA_OUT_INVALID_INPUT" })
return ResponseHandler.errorResponse({ message: isValidSchema?.message, statusCode: 400, errCode: "BAD_REQUEST", code: "DATA_OUT_INVALID_INPUT" }, req, res);
Expand All @@ -37,32 +38,32 @@ const datasetMetrics = async (req: Request, res: Response) => {

try {
if (!category || category.includes("data_freshness")) {
const dataFreshnessResult = await handleDataFreshness(dataset_id, intervals, defaultThreshold);
const dataFreshnessResult = await getDataFreshness(dataset_id, intervals, defaultThreshold);
results.push(dataFreshnessResult);
}

if (!category || category.includes("data_observability")) {
const dataObservabilityResult = await handleDataObservability(dataset_id, intervals);
const dataObservabilityResult = await getDataObservability(dataset_id, intervals);
results.push(dataObservabilityResult);
}

if (!category || category.includes("data_volume")) {
const dataVolumeResult = await handleDataVolume(dataset_id, volume_by_days, dateFormat);
const dataVolumeResult = await getDataVolume(dataset_id, volume_by_days, dateFormat);
results.push(dataVolumeResult);
}

if (!category || category.includes("data_lineage")) {
const dataLineageResult = await handleDataLineage(dataset_id, intervals);
const dataLineageResult = await getDataLineage(dataset_id, intervals);
results.push(dataLineageResult);
}

if (!category || category.includes("connectors")) {
const connectorsResult = await handleConnectors(dataset_id, intervals);
const connectorsResult = await getConnectors(dataset_id, intervals);
results.push(connectorsResult);
}

if (!category || category.includes("data_quality")) {
const connectorsResult = await handleDataQuality(dataset_id, intervals);
const connectorsResult = await getDataQuality(dataset_id, intervals);
results.push(connectorsResult);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import axios from "axios";
import dayjs from "dayjs";
import _ from "lodash";
import { config } from "../../configs/Config";
import { dataLineageSuccessQuery, generateConnectorQuery, generateDatasetQueryCallsQuery, generateDedupFailedQuery, generateDenormFailedQuery, generateTimeseriesQuery, generateTimeseriesQueryEventsPerHour, generateTotalQueryCallsQuery, generateTransformationFailedQuery, processingTimeQuery, totalEventsQuery, totalFailedEventsQuery } from "./queries";
import { config } from "../configs/Config";
import { dataLineageSuccessQuery, generateConnectorQuery, generateDatasetQueryCallsQuery, generateDedupFailedQuery, generateDenormFailedQuery, generateTimeseriesQuery, generateTimeseriesQueryEventsPerHour, generateTotalQueryCallsQuery, generateTransformationFailedQuery, processingTimeQuery, totalEventsQuery, totalFailedEventsQuery } from "../controllers/DatasetMetrics/queries";
const druidPort = _.get(config, "query_api.druid.port");
const druidHost = _.get(config, "query_api.druid.host");
const nativeQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.native_query_path}`;
const prometheusEndpoint = `${config.query_api.prometheus.url}/api/v1/query_range`;

export const handleDataFreshness = async (dataset_id: string, intervals: string, defaultThreshold: number) => {
export const getDataFreshness = async (dataset_id: string, intervals: string, defaultThreshold: number) => {
const queryPayload = processingTimeQuery(intervals, dataset_id);
const druidResponse = await axios.post(nativeQueryEndpoint, queryPayload?.query);
const avgProcessingTime = _.get(druidResponse, "data[0].average_processing_time", 0);
Expand All @@ -34,7 +34,7 @@ export const handleDataFreshness = async (dataset_id: string, intervals: string,
};
};

export const handleDataObservability = async (dataset_id: string, intervals: string) => {
export const getDataObservability = async (dataset_id: string, intervals: string) => {
const totalEventsPayload = totalEventsQuery(intervals, dataset_id);
const totalFailedEventsPayload = totalFailedEventsQuery(intervals, dataset_id);
const totalQueryCalls = generateTotalQueryCallsQuery(config?.data_observability?.data_out_query_time_period);
Expand Down Expand Up @@ -89,7 +89,7 @@ export const handleDataObservability = async (dataset_id: string, intervals: str
};
};

export const handleDataVolume = async (dataset_id: string, volume_by_days: number, dateFormat: string) => {
export const getDataVolume = async (dataset_id: string, volume_by_days: number, dateFormat: string) => {
const currentHourIntervals = dayjs().subtract(1, "hour").startOf("hour").toISOString() + "/" + dayjs().startOf("hour").toISOString();
const currentDayIntervals = dayjs().subtract(1, 'day').startOf('day').format(dateFormat) + '/' + dayjs().endOf('day').format(dateFormat);
const currentWeekIntervals = dayjs().subtract(1, 'week').startOf('week').format(dateFormat) + '/' + dayjs().endOf('week').format(dateFormat);
Expand Down Expand Up @@ -144,7 +144,7 @@ export const handleDataVolume = async (dataset_id: string, volume_by_days: numbe
};
};

export const handleDataLineage = async (dataset_id: string, intervals: string) => {
export const getDataLineage = async (dataset_id: string, intervals: string) => {
const transformationSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "transformer_status", "success");
const dedupSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "dedup_status", "success");
const denormSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "denorm_status", "success");
Expand Down Expand Up @@ -196,7 +196,7 @@ export const handleDataLineage = async (dataset_id: string, intervals: string) =
};


export const handleConnectors = async (dataset_id: string, intervals: string) => {
export const getConnectors = async (dataset_id: string, intervals: string) => {
const connectorQueryPayload = generateConnectorQuery(intervals, dataset_id);
const connectorResponse = await axios.post(nativeQueryEndpoint, connectorQueryPayload);
const connectorsData = _.get(connectorResponse, "data[0].result", []);
Expand All @@ -215,7 +215,7 @@ export const handleConnectors = async (dataset_id: string, intervals: string) =>
};
};

export const handleDataQuality = async (dataset_id: string, intervals: string) => {
export const getDataQuality = async (dataset_id: string, intervals: string) => {
const totalValidationPayload = dataLineageSuccessQuery(intervals, dataset_id, "ctx_dataset", dataset_id);
const totalValidationFailedPayload = dataLineageSuccessQuery(intervals, dataset_id, "error_pdata_status", "failed");
const [totalValidationResponse, totalValidationFailedResponse,
Expand Down

0 comments on commit cf0370f

Please sign in to comment.