Skip to content

Commit

Permalink
Revert "split ownership pr"
Browse files Browse the repository at this point in the history
  • Loading branch information
llance committed Jan 13, 2025
1 parent db60405 commit 25161fc
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 22 deletions.
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/preset.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class PresetSource(SupersetSource):
platform = "preset"

def __init__(self, ctx: PipelineContext, config: PresetConfig):
logger.info(f"ctx is {ctx}")

super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()
Expand Down
27 changes: 5 additions & 22 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,15 +555,10 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot:
chart_url = f"{self.config.display_uri}{chart_data.get('url', '')}"

datasource_id = chart_data.get("datasource_id")
if not datasource_id:
logger.debug(
f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info'
)
datasource_urn = None
else:
dataset_response = self.get_dataset_info(datasource_id)
ds_urn = self.get_datasource_urn_from_id(dataset_response, self.platform)
datasource_urn = [ds_urn] if not isinstance(ds_urn, list) else ds_urn
dataset_response = self.get_dataset_info(datasource_id)
datasource_urn = self.get_datasource_urn_from_id(
dataset_response, self.platform
)

params = json.loads(chart_data.get("params", "{}"))
metrics = [
Expand Down Expand Up @@ -744,11 +739,7 @@ def construct_dataset_from_dataset_data(
datasource_urn = self.get_datasource_urn_from_id(
dataset_response, self.platform
)
modified_ts = int(
dp.parse(dataset_data.get("changed_on_utc", "now")).timestamp() * 1000
)
modified_actor = f"urn:li:corpuser:{(dataset_data.get('changed_by') or {}).get('username', 'unknown')}"
last_modified = AuditStampClass(time=modified_ts, actor=modified_actor)

dataset_url = f"{self.config.display_uri}{dataset.explore_url or ''}"

dataset_info = DatasetPropertiesClass(
Expand Down Expand Up @@ -779,13 +770,6 @@ def construct_dataset_from_dataset_data(
],
)
aspects_items.append(owners_info)

metrics = dataset_response.get("result", {}).get("metrics", [])

if metrics:
glossary_terms = self.parse_glossary_terms_from_metrics(metrics, last_modified)
aspects_items.append(glossary_terms)

dataset_snapshot = DatasetSnapshot(
urn=datasource_urn,
aspects=aspects_items,
Expand All @@ -798,7 +782,6 @@ def emit_dataset_mces(self) -> Iterable[MetadataWorkUnit]:
dataset_snapshot = self.construct_dataset_from_dataset_data(
dataset_data
)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
except Exception as e:
self.report.warning(
Expand Down

0 comments on commit 25161fc

Please sign in to comment.