Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset Observability #310

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"busboy": "^1.6.0",
"compression": "^1.7.4",
"dateformat": "2.0.0",
"dayjs": "^1.11.13",
"express": "^5.0.0-beta.3",
"http-errors": "^2.0.0",
"http-status": "^1.5.3",
Expand Down
6 changes: 5 additions & 1 deletion api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,9 @@ export const config = {
"enable": process.env.otel_enable || "false",
"collector_endpoint": process.env.otel_collector_endpoint || "http://localhost:4318"
},
"storage_types": process.env.storage_types || '{"lake_house":true,"realtime_store":true}'
"storage_types": process.env.storage_types || '{"lake_house":true,"realtime_store":true}',
"data_observability": {
"default_freshness_threshold": process.env.default_freshness_threshold || 5, // in minutes
"data_out_query_time_period": process.env.data_out_query_time_period || "1d",
}
}
72 changes: 72 additions & 0 deletions api-service/src/controllers/TableMetrics/Metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { Request, Response } from "express";
import * as _ from "lodash"
import { ResponseHandler } from "../../helpers/ResponseHandler";
import dayjs from 'dayjs';
import { handleConnectors, handleDataFreshness, handleDataLineage, handleDataObservability, handleDataQuality, handleDataVolume } from "./metricsHelper";
import logger from "../../logger";
import { schemaValidation } from "../../services/ValidationService";
import validationSchema from "./TableMetrics.json";
import { config } from "../../configs/Config";

const apiId = "api.table.metrics";
const tableMetrics = async (req: Request, res: Response) => {
const { dataset_id } = req.params;
const msgid = _.get(req, "body.params.msgid");
const requestBody = req.body;

const { category, volume_by_days }: any = req.body.request;
const defaultThreshold = (typeof config?.data_observability?.default_freshness_threshold === 'number' ? config?.data_observability?.default_freshness_threshold : 5) * 60 * 1000; // 5 minutes in milliseconds
const dateFormat = 'YYYY-MM-DDTHH:mm:ss';
const startDate = '2000-01-01T00:00:00+05:30';
const endDate = dayjs().add(1, 'day').format(dateFormat);
const intervals = `${startDate}/${endDate}`;
const isValidSchema = schemaValidation(requestBody, validationSchema);
const results = [];
if (!isValidSchema?.isValid) {
logger.error({ apiId, datasetId:dataset_id, msgid, requestBody, message: isValidSchema?.message, code: "DATA_OUT_INVALID_INPUT" })
return ResponseHandler.errorResponse({ message: isValidSchema?.message, statusCode: 400, errCode: "BAD_REQUEST", code: "DATA_OUT_INVALID_INPUT" }, req, res);
}

try {
if (!category || category.includes("data_freshness")) {
const dataFreshnessResult = await handleDataFreshness(dataset_id, intervals, defaultThreshold);
results.push(dataFreshnessResult);
}

if (!category || category.includes("data_observability")) {
const dataObservabilityResult = await handleDataObservability(dataset_id, intervals);
results.push(dataObservabilityResult);
}

if (!category || category.includes("data_volume")) {
const dataVolumeResult = await handleDataVolume(dataset_id, volume_by_days, dateFormat);
results.push(dataVolumeResult);
}

if (!category || category.includes("data_lineage")) {
const dataLineageResult = await handleDataLineage(dataset_id, intervals);
results.push(dataLineageResult);
}

if (!category || category.includes("connectors")) {
const connectorsResult = await handleConnectors(dataset_id, intervals);
results.push(connectorsResult);
}

if (!category || category.includes("data_quality")) {
const connectorsResult = await handleDataQuality(dataset_id, intervals);
results.push(connectorsResult);
}

logger.info({ apiId, msgid, requestBody, datasetId: dataset_id, message: "Metrics fetched successfully" })
return ResponseHandler.successResponse(req, res, { status: 200, data: results });

}
catch (error: any) {
logger.error({ apiId, msgid, requestBody: req?.body, datasetId: dataset_id, message: "Error while fetching metrics", code: "FAILED_TO_FETCH_METRICS", error });
return ResponseHandler.errorResponse({ message: "Error while fetching metrics", statusCode: 500, errCode: "FAILED", code: "FAILED_TO_FETCH_METRICS" }, req, res);
}

}

export default tableMetrics;
57 changes: 57 additions & 0 deletions api-service/src/controllers/TableMetrics/TableMetrics.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"type": "object",
"properties": {
"id": {
"type": "string",
"enum": [
"api.table.metrics"
]
},
"ver": {
"type": "string"
},
"ts": {
"type": "string"
},
"params": {
"type": "object",
"properties": {
"msgid": {
"type": "string"
}
},
"required": [
"msgid"
],
"additionalProperties": false
},
"request": {
"type": "object",
"properties": {
"category": {
"type": "array",
"items": {
"type": "string",
"enum": [
"data_freshness",
"data_observability",
"data_volume",
"data_lineage",
"connectors",
"data_quality"
]
},
"minItems": 1
},
"volume_by_days": {
"type": "integer",
"minimum": 1
}
},
"required": [
"category"
]
}
},
"required": ["id", "ver", "ts", "params", "request"]
}
238 changes: 238 additions & 0 deletions api-service/src/controllers/TableMetrics/metricsHelper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import axios from "axios";
import dayjs from "dayjs";
import _ from "lodash";
import { config } from "../../configs/Config";
import { dataLineageSuccessQuery, generateConnectorQuery, generateDatasetQueryCallsQuery, generateDedupFailedQuery, generateDenormFailedQuery, generateTimeseriesQuery, generateTimeseriesQueryEventsPerHour, generateTotalQueryCallsQuery, generateTransformationFailedQuery, processingTimeQuery, totalEventsQuery, totalFailedEventsQuery } from "./queries";
const druidPort = _.get(config, "query_api.druid.port");
const druidHost = _.get(config, "query_api.druid.host");
const nativeQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.native_query_path}`;
const prometheusEndpoint = `${config.query_api.prometheus.url}/api/v1/query_range`;

export const handleDataFreshness = async (dataset_id: string, intervals: string, defaultThreshold: number) => {
const queryPayload = processingTimeQuery(intervals, dataset_id);
const druidResponse = await axios.post(nativeQueryEndpoint, queryPayload?.query);
const avgProcessingTime = _.get(druidResponse, "data[0].average_processing_time", 0);
const freshnessStatus = avgProcessingTime < defaultThreshold ? "Healthy" : "Unhealthy";

return {
category: "data_freshness",
status: freshnessStatus,
components: [
{
type: "average_time_difference_in_min",
threshold: defaultThreshold / 60000, // convert to minutes
value: avgProcessingTime / 60000,
status: freshnessStatus
},
{
type: "freshness_query_time_in_min",
threshold: 10,
value: avgProcessingTime / 60000, // convert to minutes
status: freshnessStatus
}
]
};
};

export const handleDataObservability = async (dataset_id: string, intervals: string) => {
const totalEventsPayload = totalEventsQuery(intervals, dataset_id);
const totalFailedEventsPayload = totalFailedEventsQuery(intervals, dataset_id);
const totalQueryCalls = generateTotalQueryCallsQuery(config?.data_observability?.data_out_query_time_period);
const totalQueryCallsAtDatasetLevel = generateDatasetQueryCallsQuery(dataset_id, config?.data_observability?.data_out_query_time_period);

const [totalEventsResponse, totalFailedEventsResponse, totalApiCallsResponse, totalCallsAtDatasetLevelResponse] = await Promise.all([
axios.post(nativeQueryEndpoint, totalEventsPayload),
axios.post(nativeQueryEndpoint, totalFailedEventsPayload),
axios.request({ url: prometheusEndpoint, method: "GET", params: totalQueryCalls }),
axios.request({ url: prometheusEndpoint, method: "GET", params: totalQueryCallsAtDatasetLevel })
]);

const totalApiCalls = _.map(_.get(totalApiCallsResponse, 'data.data.result'), payload => {
return _.floor(_.get(payload, 'values[0][1]'), 3) || 0
})

const totalApiCallsAtDatasetLevel = _.map(_.get(totalCallsAtDatasetLevelResponse, 'data.data.result'), payload => {
return _.floor(_.get(payload, 'values[0][1]'), 3) || 0
})

const importanceScore = (totalApiCallsAtDatasetLevel[0] / totalApiCalls[0]) * 100;

const totalEventsCount = _.get(totalEventsResponse, "data[0].result.total_events_count") || 0;
const totalFailedEventsCount = _.get(totalFailedEventsResponse, "data[0].result.total_failed_events") || 0;
let failurePercentage = 0;
if (totalEventsCount > 0) {
failurePercentage = (totalFailedEventsCount / totalEventsCount) * 100;
}
const observabilityStatus = failurePercentage > 5 ? "Unhealthy" : "Healthy";

return {
category: "data_observability",
status: observabilityStatus,
components: [
{
type: "data_observability_health",
status: observabilityStatus
},
{
type: "failure_percentage",
value: failurePercentage
},
{
type: "threshold_percentage",
value: 5
},
{
type: "importance_score",
value: importanceScore
}
]
};
};

export const handleDataVolume = async (dataset_id: string, volume_by_days: number, dateFormat: string) => {
const currentHourIntervals = dayjs().subtract(1, "hour").startOf("hour").toISOString() + "/" + dayjs().startOf("hour").toISOString();
const currentDayIntervals = dayjs().subtract(1, 'day').startOf('day').format(dateFormat) + '/' + dayjs().endOf('day').format(dateFormat);
const currentWeekIntervals = dayjs().subtract(1, 'week').startOf('week').format(dateFormat) + '/' + dayjs().endOf('week').format(dateFormat);
const previousHourIntervals = dayjs().subtract(2, "hour").startOf("hour").toISOString() + '/' + dayjs().startOf("hour").toISOString();
const previousDayIntervals = dayjs().subtract(2, 'day').startOf('day').format(dateFormat) + '/' + dayjs().subtract(1, 'day').endOf('day').format(dateFormat);
const previousWeekIntervals = dayjs().subtract(2, 'week').startOf('week').format(dateFormat) + '/' + dayjs().subtract(1, 'week').endOf('week').format(dateFormat);
const nDaysIntervals = dayjs().subtract(volume_by_days, 'day').startOf('day').format(dateFormat) + '/' + dayjs().endOf('day').format(dateFormat);

const currentHourPayload = generateTimeseriesQueryEventsPerHour(currentHourIntervals, dataset_id);
const currentDayPayload = generateTimeseriesQuery(currentDayIntervals, dataset_id);
const currentWeekPayload = generateTimeseriesQuery(currentWeekIntervals, dataset_id);
const previousHourPayload = generateTimeseriesQueryEventsPerHour(previousHourIntervals, dataset_id);
const previousDayPayload = generateTimeseriesQuery(previousDayIntervals, dataset_id);
const previousWeekPayload = generateTimeseriesQuery(previousWeekIntervals, dataset_id);
const nDaysPayload = generateTimeseriesQuery(nDaysIntervals, dataset_id);
const [
currentHourResponse, currentDayResponse, currentWeekResponse,
previousHourResponse, previousDayResponse, previousWeekResponse,
nDaysResponse
] = await Promise.all([
axios.post(nativeQueryEndpoint, currentHourPayload),
axios.post(nativeQueryEndpoint, currentDayPayload),
axios.post(nativeQueryEndpoint, currentWeekPayload),
axios.post(nativeQueryEndpoint, previousHourPayload),
axios.post(nativeQueryEndpoint, previousDayPayload),
axios.post(nativeQueryEndpoint, previousWeekPayload),
axios.post(nativeQueryEndpoint, nDaysPayload)
]);
const currentHourCount = _.get(currentHourResponse, "data[0].result.count") || 0;
const currentDayCount = _.get(currentDayResponse, "data[0].result.count") || 0;
const currentWeekCount = _.get(currentWeekResponse, "data[0].result.count") || 0;
const previousHourCount = _.get(previousHourResponse, "data[0].result.count") || 0;
const previousDayCount = _.get(previousDayResponse, "data[0].result.count") || 0;
const previousWeekCount = _.get(previousWeekResponse, "data[0].result.count") || 0;
const nDaysCount = _.get(nDaysResponse, "data[0].result.count") || 0;

const volumePercentageByHour = previousHourCount > 0 ? ((currentHourCount - previousHourCount) / previousHourCount) * 100 : 0;
const volumePercentageByDay = previousDayCount > 0 ? ((currentDayCount - previousDayCount) / previousDayCount) * 100 : 0;
const volumePercentageByWeek = previousWeekCount > 0 ? ((currentWeekCount - previousWeekCount) / previousWeekCount) * 100 : 0;

return {
category: "data_volume",
components: [
{ type: "events_per_hour", value: currentHourCount },
{ type: "events_per_day", value: currentDayCount },
{ type: "events_per_n_day", value: nDaysCount },
{ type: "volume_percentage_by_hour", value: volumePercentageByHour },
{ type: "volume_percentage_by_day", value: volumePercentageByDay },
{ type: "volume_percentage_by_week", value: volumePercentageByWeek },
{ type: "growth_rate_percentage", value: volumePercentageByHour }
]
};
};

export const handleDataLineage = async (dataset_id: string, intervals: string) => {
const transformationSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "transformer_status", "success");
const dedupSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "dedup_status", "success");
const denormSuccessPayload = dataLineageSuccessQuery(intervals, dataset_id, "denorm_status", "success");
const totalValidationPayload = dataLineageSuccessQuery(intervals, dataset_id, "ctx_dataset", dataset_id);
const totalValidationFailedPayload = dataLineageSuccessQuery(intervals, dataset_id, "error_pdata_status", "failed");
const transformationFailedPayload = generateTransformationFailedQuery(intervals, dataset_id);
const dedupFailedPayload = generateDedupFailedQuery(intervals, dataset_id);
const denormFailedPayload = generateDenormFailedQuery(intervals, dataset_id);

const [
transformationSuccessResponse, dedupSuccessResponse, denormSuccessResponse,
totalValidationResponse, totalValidationFailedResponse, transformationFailedResponse, dedupFailedResponse, denormFailedResponse
] = await Promise.all([
axios.post(nativeQueryEndpoint, transformationSuccessPayload),
axios.post(nativeQueryEndpoint, dedupSuccessPayload),
axios.post(nativeQueryEndpoint, denormSuccessPayload),
axios.post(nativeQueryEndpoint, totalValidationPayload),
axios.post(nativeQueryEndpoint, totalValidationFailedPayload),
axios.post(nativeQueryEndpoint, transformationFailedPayload),
axios.post(nativeQueryEndpoint, dedupFailedPayload),
axios.post(nativeQueryEndpoint, denormFailedPayload)
]);

// success at each level
const transformationSuccessCount = _.get(transformationSuccessResponse, "data[0].result.count") || 0;
const dedupSuccessCount = _.get(dedupSuccessResponse, "data[0].result.count") || 0;
const denormSuccessCount = _.get(denormSuccessResponse, "data[0].result.count") || 0;
const totalValidationCount = _.get(totalValidationResponse, "data[0].result.count") || 0;
const totalValidationFailedCount = _.get(totalValidationFailedResponse, "data[0].result.count") || 0;
const storageSuccessCount = totalValidationCount - totalValidationFailedCount;

// failure at each level
const transformationFailedCount = _.get(transformationFailedResponse, "data[0].result.count") || 0;
const dedupFailedCount = _.get(dedupFailedResponse, "data[0].result.count") || 0;
const denormFailedCount = _.get(denormFailedResponse, "data[0].result.count") || 0;
return {
category: "data_lineage",
components: [
{ type: "transformation_success", value: transformationSuccessCount },
{ type: "dedup_success", value: dedupSuccessCount },
{ type: "denormalization_success", value: denormSuccessCount },
{ type: "total_success", value: storageSuccessCount },
{ type: "total_failed", value: totalValidationFailedCount },
{ type: "transformation_failed", value: transformationFailedCount },
{ type: "dedup_failed", value: dedupFailedCount },
{ type: "denorm_failed", value: denormFailedCount }
]
};
};


export const handleConnectors = async (dataset_id: string, intervals: string) => {
const connectorQueryPayload = generateConnectorQuery(intervals, dataset_id);
const connectorResponse = await axios.post(nativeQueryEndpoint, connectorQueryPayload);
const connectorsData = _.get(connectorResponse, "data[0].result", []);
const result = {
category: "connectors",
components: connectorsData.map((item: any) => ({
id: item.name || "failed",
type: item.name === null ? "failed" : "success",
value: item.count
}))
};

return {
category: "connectors",
components: result
};
};

export const handleDataQuality = async (dataset_id: string, intervals: string) => {
const totalValidationPayload = dataLineageSuccessQuery(intervals, dataset_id, "ctx_dataset", dataset_id);
const totalValidationFailedPayload = dataLineageSuccessQuery(intervals, dataset_id, "error_pdata_status", "failed");
const [totalValidationResponse, totalValidationFailedResponse,
] = await Promise.all([
axios.post(nativeQueryEndpoint, totalValidationPayload),
axios.post(nativeQueryEndpoint, totalValidationFailedPayload),
]);
const totalValidationCount = _.get(totalValidationResponse, "data[0].result.count") || 0;
const totalValidationFailedCount = _.get(totalValidationFailedResponse, "data[0].result.count") || 0;
const storageSuccessCount = totalValidationCount - totalValidationFailedCount;

return {
category: "data_quality",
"components": [
{ type: "incidents_failed", value: totalValidationFailedCount },
{ type: "incidents_success", value: storageSuccessCount },
{ type: "total_incidents", value: totalValidationCount }
]
};
}
Loading
Loading