diff --git a/datahub-upgrade/src/test/resources/bootstrapmcp/datahub-test-mcp.yaml b/datahub-upgrade/src/test/resources/bootstrapmcp/datahub-test-mcp.yaml
index d049a807ac1d8..233db06d61c3f 100644
--- a/datahub-upgrade/src/test/resources/bootstrapmcp/datahub-test-mcp.yaml
+++ b/datahub-upgrade/src/test/resources/bootstrapmcp/datahub-test-mcp.yaml
@@ -23,7 +23,13 @@
keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}}
soft_deleted_entities_cleanup:
retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}}
+ execution_request_cleanup:
+ keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}}
+ keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}}
+ keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}}
+ batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}}
+ enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}}
extraArgs: {}
debugMode: false
executorId: default
- headers: {}
\ No newline at end of file
+ headers: {}
diff --git a/docs-website/src/pages/_components/Community/community.module.scss b/docs-website/src/pages/_components/Community/community.module.scss
index 62f0dc13b110a..65da50435597d 100644
--- a/docs-website/src/pages/_components/Community/community.module.scss
+++ b/docs-website/src/pages/_components/Community/community.module.scss
@@ -221,14 +221,14 @@
.numberContainer {
display: inline-block;
- width: 11rem;
+ width: 12rem;
text-align: right;
}
.numberChange {
display: inline-block;
animation: slideIn 0.5s ease-in-out;
- width: 11rem;
+ width: 12rem;
}
diff --git a/docs-website/src/pages/_components/QuickstartContent/quickstartcontent.module.scss b/docs-website/src/pages/_components/QuickstartContent/quickstartcontent.module.scss
index 1c1f13cbca566..27bdca710b821 100644
--- a/docs-website/src/pages/_components/QuickstartContent/quickstartcontent.module.scss
+++ b/docs-website/src/pages/_components/QuickstartContent/quickstartcontent.module.scss
@@ -129,7 +129,7 @@
.quickstart__content {
display: flex;
- margin-bottom: 3rem;
+ margin-bottom: 6rem;
width: 100%;
.quickstart__text {
diff --git a/docs-website/src/pages/solutions/_components/Integrations/index.js b/docs-website/src/pages/solutions/_components/Integrations/index.js
index 77f028eb4cf74..763c2a185c7dd 100644
--- a/docs-website/src/pages/solutions/_components/Integrations/index.js
+++ b/docs-website/src/pages/solutions/_components/Integrations/index.js
@@ -47,6 +47,7 @@ const Integrations = () => {
+ See all →
);
};
diff --git a/docs-website/src/pages/solutions/_components/Integrations/integrations.module.scss b/docs-website/src/pages/solutions/_components/Integrations/integrations.module.scss
index da0c6964e8775..07a6a2548e41a 100644
--- a/docs-website/src/pages/solutions/_components/Integrations/integrations.module.scss
+++ b/docs-website/src/pages/solutions/_components/Integrations/integrations.module.scss
@@ -1,6 +1,13 @@
.container {
display: flex;
flex-direction: column;
+ >a {
+ text-decoration: none;
+ text-align: center;
+ margin-top: 1rem;
+ margin-bottom: 1rem;
+ font-size: 1.25rem;
+ }
.section_header {
color: var(--primitives-text-tex-subtext, #777E99);
diff --git a/docs-website/src/pages/solutions/_components/IntegrationsStatic/index.js b/docs-website/src/pages/solutions/_components/IntegrationsStatic/index.js
index 76b99b156704e..17857f7b0360b 100644
--- a/docs-website/src/pages/solutions/_components/IntegrationsStatic/index.js
+++ b/docs-website/src/pages/solutions/_components/IntegrationsStatic/index.js
@@ -17,7 +17,7 @@ const Integrations = () => {
{[...Array(1)].map((_, i) => (
- {[1, 2, 3, 4, 5, 6].map((item, index) => (
+ {[1, 2, 3, 4, 5, 6, 7, 8].map((item, index) => (
))}
diff --git a/docs-website/src/pages/solutions/_components/IntegrationsStatic/integrations.module.scss b/docs-website/src/pages/solutions/_components/IntegrationsStatic/integrations.module.scss
index aa2201fd0185c..a945f4ae0598f 100644
--- a/docs-website/src/pages/solutions/_components/IntegrationsStatic/integrations.module.scss
+++ b/docs-website/src/pages/solutions/_components/IntegrationsStatic/integrations.module.scss
@@ -71,18 +71,22 @@
.slider {
position: relative;
+ display: flex;
}
.slide_track {
display: flex;
- width: max-content;
+ width: 80%;
margin: auto;
+ flex-direction: row;
+ align-items: center;
+ justify-content: space-evenly;
}
.slide {
- width: 100px;
- height: 100px;
- margin: auto 3rem;
+ width: 80px;
+ height: 80px;
+ margin: auto 0;
display: flex;
justify-content: space-between;
overflow: hidden;
@@ -99,9 +103,12 @@
max-width: 100vw;
min-width: auto;
}
+ .slide_track {
+ width: 95%;
+ }
.slide {
- width: 80px;
- height: 80px;
- margin: auto 1rem;
+ width: 40px;
+ height: 40px;
+ margin: auto 0;
}
}
\ No newline at end of file
diff --git a/docs-website/static/img/solutions/integrations-observe/logo-integration-5.png b/docs-website/static/img/solutions/integrations-observe/logo-integration-5.png
index acc17cb75d585..c4d69b1ed3bc1 100644
Binary files a/docs-website/static/img/solutions/integrations-observe/logo-integration-5.png and b/docs-website/static/img/solutions/integrations-observe/logo-integration-5.png differ
diff --git a/docs-website/static/img/solutions/integrations-observe/logo-integration-6.png b/docs-website/static/img/solutions/integrations-observe/logo-integration-6.png
index d9bb08766f527..c22402e99d1f3 100644
Binary files a/docs-website/static/img/solutions/integrations-observe/logo-integration-6.png and b/docs-website/static/img/solutions/integrations-observe/logo-integration-6.png differ
diff --git a/docs-website/static/img/solutions/integrations-observe/logo-integration-7.png b/docs-website/static/img/solutions/integrations-observe/logo-integration-7.png
new file mode 100644
index 0000000000000..acc17cb75d585
Binary files /dev/null and b/docs-website/static/img/solutions/integrations-observe/logo-integration-7.png differ
diff --git a/docs-website/static/img/solutions/integrations-observe/logo-integration-8.png b/docs-website/static/img/solutions/integrations-observe/logo-integration-8.png
new file mode 100644
index 0000000000000..d9bb08766f527
Binary files /dev/null and b/docs-website/static/img/solutions/integrations-observe/logo-integration-8.png differ
diff --git a/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml b/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml
index 21734cd4e03fa..05e5205f7da41 100644
--- a/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml
+++ b/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml
@@ -22,3 +22,14 @@ source:
soft_deleted_entities_cleanup:
# Delete soft deleted entities which were deleted 10 days ago
retention_days: 10
+ execution_request_cleanup:
+ # Minimum number of execution requests to keep, per ingestion source
+ keep_history_min_count: 10
+ # Maximum number of execution requests to keep, per ingestion source
+ keep_history_max_count: 1000
+ # Maximum number of days to keep execution requests for, per ingestion source
+ keep_history_max_days: 30
+ # Number of records per read operation
+ batch_read_size: 100
+ # Global switch for this cleanup task
+ enabled: true
diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
index 1897f3f288ec0..c4b4186f45fc3 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
@@ -24,6 +24,11 @@
DataProcessCleanupConfig,
DataProcessCleanupReport,
)
+from datahub.ingestion.source.gc.execution_request_cleanup import (
+ DatahubExecutionRequestCleanup,
+ DatahubExecutionRequestCleanupConfig,
+ DatahubExecutionRequestCleanupReport,
+)
from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import (
SoftDeletedEntitiesCleanup,
SoftDeletedEntitiesCleanupConfig,
@@ -70,9 +75,18 @@ class DataHubGcSourceConfig(ConfigModel):
description="Configuration for soft deleted entities cleanup",
)
+ execution_request_cleanup: Optional[DatahubExecutionRequestCleanupConfig] = Field(
+ default=None,
+ description="Configuration for execution request cleanup",
+ )
+
@dataclass
-class DataHubGcSourceReport(DataProcessCleanupReport, SoftDeletedEntitiesReport):
+class DataHubGcSourceReport(
+ DataProcessCleanupReport,
+ SoftDeletedEntitiesReport,
+ DatahubExecutionRequestCleanupReport,
+):
expired_tokens_revoked: int = 0
@@ -97,6 +111,7 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):
self.graph = ctx.require_graph("The DataHubGc source")
self.dataprocess_cleanup: Optional[DataProcessCleanup] = None
self.soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanup] = None
+ self.execution_request_cleanup: Optional[DatahubExecutionRequestCleanup] = None
if self.config.dataprocess_cleanup:
self.dataprocess_cleanup = DataProcessCleanup(
@@ -109,6 +124,12 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):
self.report,
self.config.dry_run,
)
+ if self.config.execution_request_cleanup:
+ self.execution_request_cleanup = DatahubExecutionRequestCleanup(
+ config=self.config.execution_request_cleanup,
+ graph=self.graph,
+ report=self.report,
+ )
@classmethod
def create(cls, config_dict, ctx):
@@ -130,6 +151,8 @@ def get_workunits_internal(
yield from self.dataprocess_cleanup.get_workunits_internal()
if self.soft_deleted_entities_cleanup:
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
+ if self.execution_request_cleanup:
+ self.execution_request_cleanup.run()
yield from []
def truncate_indices(self) -> None:
diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py
new file mode 100644
index 0000000000000..570df4e99ab13
--- /dev/null
+++ b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py
@@ -0,0 +1,240 @@
+import logging
+import time
+from typing import Any, Dict, Iterator, Optional
+
+from pydantic import BaseModel, Field
+
+from datahub.configuration.common import ConfigModel
+from datahub.ingestion.api.source import SourceReport
+from datahub.ingestion.graph.client import DataHubGraph
+
+logger = logging.getLogger(__name__)
+
+DATAHUB_EXECUTION_REQUEST_ENTITY_NAME = "dataHubExecutionRequest"
+DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME = "dataHubExecutionRequestKey"
+DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME = "dataHubExecutionRequestInput"
+DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME = "dataHubExecutionRequestResult"
+
+
+class DatahubExecutionRequestCleanupConfig(ConfigModel):
+ keep_history_min_count: int = Field(
+ 10,
+ description="Minimum number of execution requests to keep, per ingestion source",
+ )
+
+ keep_history_max_count: int = Field(
+ 1000,
+ description="Maximum number of execution requests to keep, per ingestion source",
+ )
+
+ keep_history_max_days: int = Field(
+ 30,
+ description="Maximum number of days to keep execution requests for, per ingestion source",
+ )
+
+ batch_read_size: int = Field(
+ 100,
+ description="Number of records per read operation",
+ )
+
+ enabled: bool = Field(
+ True,
+ description="Global switch for this cleanup task",
+ )
+
+ def keep_history_max_milliseconds(self):
+ return self.keep_history_max_days * 24 * 3600 * 1000
+
+
+class DatahubExecutionRequestCleanupReport(SourceReport):
+ execution_request_cleanup_records_read: int = 0
+ execution_request_cleanup_records_preserved: int = 0
+ execution_request_cleanup_records_deleted: int = 0
+ execution_request_cleanup_read_errors: int = 0
+ execution_request_cleanup_delete_errors: int = 0
+
+
+class CleanupRecord(BaseModel):
+ urn: str
+ request_id: str
+ status: str
+ ingestion_source: str
+ requested_at: int
+
+
+class DatahubExecutionRequestCleanup:
+ def __init__(
+ self,
+ graph: DataHubGraph,
+ report: DatahubExecutionRequestCleanupReport,
+ config: Optional[DatahubExecutionRequestCleanupConfig] = None,
+ ) -> None:
+
+ self.graph = graph
+ self.report = report
+ self.instance_id = int(time.time())
+
+ if config is not None:
+ self.config = config
+ else:
+ self.config = DatahubExecutionRequestCleanupConfig()
+
+ def _to_cleanup_record(self, entry: Dict) -> CleanupRecord:
+ input_aspect = (
+ entry.get("aspects", {})
+ .get(DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME, {})
+ .get("value", {})
+ )
+ result_aspect = (
+ entry.get("aspects", {})
+ .get(DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME, {})
+ .get("value", {})
+ )
+ key_aspect = (
+ entry.get("aspects", {})
+ .get(DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME, {})
+ .get("value", {})
+ )
+ return CleanupRecord(
+ urn=entry.get("urn"),
+ request_id=key_aspect.get("id"),
+ requested_at=input_aspect.get("requestedAt", 0),
+ status=result_aspect.get("status", "PENDING"),
+ ingestion_source=input_aspect.get("source", {}).get("ingestionSource", ""),
+ )
+
+ def _scroll_execution_requests(
+ self, overrides: Dict[str, Any] = {}
+ ) -> Iterator[CleanupRecord]:
+ headers: Dict[str, Any] = {
+ "Accept": "application/json",
+ "Content-Type": "application/json",
+ }
+ params = {
+ "aspectNames": [
+ DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME,
+ DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME,
+ DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME,
+ ],
+ "count": str(self.config.batch_read_size),
+ "sort": "requestTimeMs",
+ "sortOrder": "DESCENDING",
+ "systemMetadata": "false",
+ "skipCache": "true",
+ }
+ params.update(overrides)
+
+ while True:
+ try:
+ url = f"{self.graph.config.server}/openapi/v2/entity/{DATAHUB_EXECUTION_REQUEST_ENTITY_NAME}"
+ response = self.graph._session.get(url, headers=headers, params=params)
+ response.raise_for_status()
+ document = response.json()
+
+ entries = document.get("results", [])
+ for entry in entries:
+ yield self._to_cleanup_record(entry)
+
+ if "scrollId" not in document:
+ break
+ params["scrollId"] = document["scrollId"]
+ except Exception as e:
+ logger.error(
+ f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}"
+ )
+ self.report.execution_request_cleanup_read_errors += 1
+
+ def _scroll_garbage_records(self):
+ state: Dict[str, Dict] = {}
+
+ now_ms = int(time.time()) * 1000
+ running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000
+
+ for entry in self._scroll_execution_requests():
+ self.report.execution_request_cleanup_records_read += 1
+ key = entry.ingestion_source
+
+ # Always delete corrupted records
+ if not key:
+ logger.warning(
+ f"ergc({self.instance_id}): will delete corrupted entry with missing source key: {entry}"
+ )
+ yield entry
+ continue
+
+ if key not in state:
+ state[key] = {}
+ state[key]["cutoffTimestamp"] = (
+ entry.requested_at - self.config.keep_history_max_milliseconds()
+ )
+
+ state[key]["count"] = state[key].get("count", 0) + 1
+
+ # Do not delete if number of requests is below minimum
+ if state[key]["count"] < self.config.keep_history_min_count:
+ self.report.execution_request_cleanup_records_preserved += 1
+ continue
+
+ # Do not delete if number of requests do not exceed allowed maximum,
+ # or the cutoff date.
+ if (state[key]["count"] < self.config.keep_history_max_count) and (
+ entry.requested_at > state[key]["cutoffTimestamp"]
+ ):
+ self.report.execution_request_cleanup_records_preserved += 1
+ continue
+
+ # Do not delete if status is RUNNING or PENDING and created within last month. If the record is >month old and it did not
+ # transition to a final state within that timeframe, it likely has no value.
+ if entry.requested_at > running_guard_timeout and entry.status in [
+ "RUNNING",
+ "PENDING",
+ ]:
+ self.report.execution_request_cleanup_records_preserved += 1
+ continue
+
+ # Otherwise delete current record
+ logger.info(
+ (
+ f"ergc({self.instance_id}): going to delete {entry.request_id} in source {key}; "
+ f"source count: {state[key]['count']}; "
+ f"source cutoff: {state[key]['cutoffTimestamp']}; "
+ f"record timestamp: {entry.requested_at}."
+ )
+ )
+ self.report.execution_request_cleanup_records_deleted += 1
+ yield entry
+
+ def _delete_entry(self, entry: CleanupRecord) -> None:
+ try:
+ logger.info(
+ f"ergc({self.instance_id}): going to delete ExecutionRequest {entry.request_id}"
+ )
+ self.graph.delete_entity(entry.urn, True)
+ except Exception as e:
+ self.report.execution_request_cleanup_delete_errors += 1
+ logger.error(
+ f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}"
+ )
+
+ def run(self) -> None:
+ if not self.config.enabled:
+ logger.info(
+ f"ergc({self.instance_id}): ExecutionRequest cleaner is disabled."
+ )
+ return
+
+ logger.info(
+ (
+ f"ergc({self.instance_id}): Starting cleanup of ExecutionRequest records; "
+ f"max days: {self.config.keep_history_max_days}, "
+ f"min records: {self.config.keep_history_min_count}, "
+ f"max records: {self.config.keep_history_max_count}."
+ )
+ )
+
+ for entry in self._scroll_garbage_records():
+ self._delete_entry(entry)
+
+ logger.info(
+ f"ergc({self.instance_id}): Finished cleanup of ExecutionRequest records."
+ )
diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml
index 10ae176b2c31e..dda79120118e5 100644
--- a/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml
+++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml
@@ -38,7 +38,7 @@ bootstrap:
# Ingestion Recipes
- name: ingestion-datahub-gc
- version: v1
+ version: v2
optional: true
mcps_location: "bootstrap_mcps/ingestion-datahub-gc.yaml"
- values_env: "DATAHUB_GC_BOOTSTRAP_VALUES"
\ No newline at end of file
+ values_env: "DATAHUB_GC_BOOTSTRAP_VALUES"
diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml
index e70ab1162a381..f30ce148ec6cb 100644
--- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml
+++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml
@@ -27,7 +27,13 @@
keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}}
soft_deleted_entities_cleanup:
retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}}
- extraArgs: {}
+ execution_request_cleanup:
+ keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}}
+ keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}}
+ keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}}
+ batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}}
+ enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}}
+ extraArgs: {}
debugMode: false
executorId: default
source: