Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
sgomezvillamor authored Feb 5, 2025
2 parents 4d67499 + 66bce0d commit 6944d6f
Show file tree
Hide file tree
Showing 39 changed files with 4,089 additions and 1,958 deletions.
9 changes: 7 additions & 2 deletions metadata-ingestion/docs/sources/snowflake/snowflake_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ grant operate, usage on warehouse "<your-warehouse>" to role datahub_role;
grant usage on DATABASE "<your-database>" to role datahub_role;
grant usage on all schemas in database "<your-database>" to role datahub_role;
grant usage on future schemas in database "<your-database>" to role datahub_role;
grant select on all streams in database "<your-database>> to role datahub_role;
grant select on future streams in database "<your-database>> to role datahub_role;

// If you are NOT using Snowflake Profiling or Classification feature: Grant references privileges to your tables and views
grant references on all tables in database "<your-database>" to role datahub_role;
Expand Down Expand Up @@ -50,9 +52,12 @@ The details of each granted privilege can be viewed in [snowflake docs](https://
If the warehouse is already running during ingestion or has auto-resume enabled,
this permission is not required.
- `usage` is required for us to run queries using the warehouse
- `usage` on `database` and `schema` are required because without it tables and views inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view exists then we will not be able to get metadata for the table/view.
- `usage` on `database` and `schema` are required because without it tables, views, and streams inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view/stream exists then we will not be able to get metadata for the table/view/stream.
- If metadata is required only on some schemas then you can grant the usage privilieges only on a particular schema like

```sql
grant usage on schema "<your-database>"."<your-schema>" to role datahub_role;
```
- `select` on `streams` is required in order for stream definitions to be available. This does not allow selecting of the data (not required) unless the underlying dataset has select access as well.
```sql
grant usage on schema "<your-database>"."<your-schema>" to role datahub_role;
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class DatasetSubTypes(StrEnum):
SAC_LIVE_DATA_MODEL = "Live Data Model"
NEO4J_NODE = "Neo4j Node"
NEO4J_RELATIONSHIP = "Neo4j Relationship"
SNOWFLAKE_STREAM = "Snowflake Stream"

# TODO: Create separate entity...
NOTEBOOK = "Notebook"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class Constant:
ACTIVE = "Active"
SQL_PARSING_FAILURE = "SQL Parsing Failure"
M_QUERY_NULL = '"null"'
REPORT_WEB_URL = "reportWebUrl"


@dataclass
Expand Down
31 changes: 28 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,11 @@ def tile_custom_properties(tile: powerbi_data_classes.Tile) -> dict:
if tile.dataset is not None and tile.dataset.webUrl is not None:
custom_properties[Constant.DATASET_WEB_URL] = tile.dataset.webUrl

if tile.report is not None and tile.report.id is not None:
custom_properties[Constant.REPORT_ID] = tile.report.id
if tile.report_id is not None:
custom_properties[Constant.REPORT_ID] = tile.report_id

if tile.report is not None and tile.report.webUrl is not None:
custom_properties[Constant.REPORT_WEB_URL] = tile.report.webUrl

return custom_properties

Expand Down Expand Up @@ -1053,6 +1056,7 @@ def report_to_dashboard(
report: powerbi_data_classes.Report,
chart_mcps: List[MetadataChangeProposalWrapper],
user_mcps: List[MetadataChangeProposalWrapper],
dashboard_edges: List[EdgeClass],
) -> List[MetadataChangeProposalWrapper]:
"""
Map PowerBi report to Datahub dashboard
Expand All @@ -1074,6 +1078,7 @@ def report_to_dashboard(
charts=chart_urn_list,
lastModified=ChangeAuditStamps(),
dashboardUrl=report.webUrl,
dashboards=dashboard_edges,
)

info_mcp = self.new_mcp(
Expand Down Expand Up @@ -1167,8 +1172,28 @@ def report_to_datahub_work_units(
ds_mcps = self.to_datahub_dataset(report.dataset, workspace)
chart_mcps = self.pages_to_chart(report.pages, workspace, ds_mcps)

# find all dashboards with a Tile referencing this report
downstream_dashboards_edges = []
for d in workspace.dashboards.values():
if any(t.report_id == report.id for t in d.tiles):
dashboard_urn = builder.make_dashboard_urn(
platform=self.__config.platform_name,
platform_instance=self.__config.platform_instance,
name=d.get_urn_part(),
)
edge = EdgeClass(
destinationUrn=dashboard_urn,
sourceUrn=None,
created=None,
lastModified=None,
properties=None,
)
downstream_dashboards_edges.append(edge)

# Let's convert report to datahub dashboard
report_mcps = self.report_to_dashboard(workspace, report, chart_mcps, user_mcps)
report_mcps = self.report_to_dashboard(
workspace, report, chart_mcps, user_mcps, downstream_dashboards_edges
)

# Now add MCPs in sequence
mcps.extend(ds_mcps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,15 @@ class CreatedFrom(Enum):
id: str
title: str
embedUrl: str
dataset: Optional["PowerBIDataset"]
dataset_id: Optional[str]
report: Optional[Report]
report_id: Optional[str]
createdFrom: CreatedFrom

# In a first pass, `dataset_id` and/or `report_id` are filled in.
# In a subsequent pass, the objects are populated.
dataset: Optional["PowerBIDataset"]
report: Optional[Report]

def get_urn_part(self):
return f"charts.{self.id}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,41 +337,6 @@ def get_tiles(self, workspace: Workspace, dashboard: Dashboard) -> List[Tile]:
-tiles), there is no information available on pagination
"""

def new_dataset_or_report(tile_instance: Any) -> dict:
"""
Find out which is the data source for tile. It is either REPORT or DATASET
"""
report_fields = {
Constant.REPORT: (
self.get_report(
workspace=workspace,
report_id=tile_instance.get(Constant.REPORT_ID),
)
if tile_instance.get(Constant.REPORT_ID) is not None
else None
),
Constant.CREATED_FROM: Tile.CreatedFrom.UNKNOWN,
}

# reportId and datasetId are exclusive in tile_instance
# if datasetId is present that means tile is created from dataset
# if reportId is present that means tile is created from report
# if both i.e. reportId and datasetId are not present then tile is created from some visualization
if tile_instance.get(Constant.REPORT_ID) is not None:
report_fields[Constant.CREATED_FROM] = Tile.CreatedFrom.REPORT
elif tile_instance.get(Constant.DATASET_ID) is not None:
report_fields[Constant.CREATED_FROM] = Tile.CreatedFrom.DATASET
else:
report_fields[Constant.CREATED_FROM] = Tile.CreatedFrom.VISUALIZATION

title: Optional[str] = tile_instance.get(Constant.TITLE)
_id: Optional[str] = tile_instance.get(Constant.ID)
created_from: Any = report_fields[Constant.CREATED_FROM]
logger.info(f"Tile {title}({_id}) is created from {created_from}")

return report_fields

tile_list_endpoint: str = self.get_tiles_endpoint(
workspace, dashboard_id=dashboard.id
)
Expand All @@ -393,8 +358,18 @@ def new_dataset_or_report(tile_instance: Any) -> dict:
title=instance.get(Constant.TITLE),
embedUrl=instance.get(Constant.EMBED_URL),
dataset_id=instance.get(Constant.DATASET_ID),
report_id=instance.get(Constant.REPORT_ID),
dataset=None,
**new_dataset_or_report(instance),
report=None,
createdFrom=(
# In the past we considered that only one of the two report_id or dataset_id would be present
# but we have seen cases where both are present. If both are present, we prioritize the report.
Tile.CreatedFrom.REPORT
if instance.get(Constant.REPORT_ID)
else Tile.CreatedFrom.DATASET
if instance.get(Constant.DATASET_ID)
else Tile.CreatedFrom.VISUALIZATION
),
)
for instance in tile_dict
if instance is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,13 +625,26 @@ def fill_dashboards() -> None:
dashboard.tiles = self._get_resolver().get_tiles(
workspace, dashboard=dashboard
)
# set the dataset for tiles
# set the dataset and the report for tiles
for tile in dashboard.tiles:
# In Power BI, dashboards, reports, and datasets are tightly scoped to the workspace they belong to.
# https://learn.microsoft.com/en-us/power-bi/collaborate-share/service-new-workspaces
if tile.report_id:
tile.report = workspace.reports.get(tile.report_id)
if tile.report is None:
self.reporter.info(
title="Missing Report Lineage For Tile",
message="A Report reference that failed to be resolved. Please ensure that 'extract_reports' is set to True in the configuration.",
context=f"workspace-name: {workspace.name}, tile-name: {tile.title}, report-id: {tile.report_id}",
)
# However, semantic models (aka datasets) can be shared accross workspaces
# https://learn.microsoft.com/en-us/fabric/admin/portal-workspace#use-semantic-models-across-workspaces
# That's why the global 'dataset_registry' is required
if tile.dataset_id:
tile.dataset = self.dataset_registry.get(tile.dataset_id)
if tile.dataset is None:
self.reporter.info(
title="Missing Lineage For Tile",
title="Missing Dataset Lineage For Tile",
message="A cross-workspace reference that failed to be resolved. Please ensure that no global workspace is being filtered out due to the workspace_id_pattern.",
context=f"workspace-name: {workspace.name}, tile-name: {tile.title}, dataset-id: {tile.dataset_id}",
)
Expand All @@ -653,10 +666,10 @@ def fill_dashboard_tags() -> None:
for dashboard in workspace.dashboards.values():
dashboard.tags = workspace.dashboard_endorsements.get(dashboard.id, [])

# fill reports first since some dashboard may reference a report
fill_reports()
if self.__config.extract_dashboards:
fill_dashboards()

fill_reports()
fill_dashboard_tags()
self._fill_independent_datasets(workspace=workspace)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SnowflakeObjectDomain(StrEnum):
SCHEMA = "schema"
COLUMN = "column"
ICEBERG_TABLE = "iceberg table"
STREAM = "stream"


GENERIC_PERMISSION_ERROR_KEY = "permission-error"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ class SnowflakeFilterConfig(SQLFilterConfig):
)
# table_pattern and view_pattern are inherited from SQLFilterConfig

stream_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for streams to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)

match_fully_qualified_names: bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
Expand Down Expand Up @@ -274,6 +279,11 @@ class SnowflakeV2Config(
description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.",
)

include_streams: bool = Field(
default=True,
description="If enabled, streams will be ingested as separate entities from tables/views.",
)

structured_property_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sql_parsing_aggregator import (
KnownLineageMapping,
ObservedQuery,
PreparsedQuery,
SqlAggregatorReport,
SqlParsingAggregator,
Expand Down Expand Up @@ -241,7 +242,13 @@ def get_workunits_internal(
use_cached_audit_log = audit_log_file.exists()

queries: FileBackedList[
Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap]
Union[
KnownLineageMapping,
PreparsedQuery,
TableRename,
TableSwap,
ObservedQuery,
]
]
if use_cached_audit_log:
logger.info("Using cached audit log")
Expand All @@ -252,7 +259,13 @@ def get_workunits_internal(

shared_connection = ConnectionWrapper(audit_log_file)
queries = FileBackedList(shared_connection)
entry: Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap]
entry: Union[
KnownLineageMapping,
PreparsedQuery,
TableRename,
TableSwap,
ObservedQuery,
]

with self.report.copy_history_fetch_timer:
for entry in self.fetch_copy_history():
Expand Down Expand Up @@ -329,7 +342,7 @@ def fetch_copy_history(self) -> Iterable[KnownLineageMapping]:

def fetch_query_log(
self, users: UsersMapping
) -> Iterable[Union[PreparsedQuery, TableRename, TableSwap]]:
) -> Iterable[Union[PreparsedQuery, TableRename, TableSwap, ObservedQuery]]:
query_log_query = _build_enriched_query_log_query(
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
Expand Down Expand Up @@ -362,7 +375,7 @@ def fetch_query_log(

def _parse_audit_log_row(
self, row: Dict[str, Any], users: UsersMapping
) -> Optional[Union[TableRename, TableSwap, PreparsedQuery]]:
) -> Optional[Union[TableRename, TableSwap, PreparsedQuery, ObservedQuery]]:
json_fields = {
"DIRECT_OBJECTS_ACCESSED",
"OBJECTS_MODIFIED",
Expand Down Expand Up @@ -398,6 +411,34 @@ def _parse_audit_log_row(
pass
else:
return None

user = CorpUserUrn(
self.identifiers.get_user_identifier(
res["user_name"], users.get(res["user_name"])
)
)

# Use direct_objects_accessed instead objects_modified
# objects_modified returns $SYS_VIEW_X with no mapping
has_stream_objects = any(
obj.get("objectDomain") == "Stream" for obj in direct_objects_accessed
)

# If a stream is used, default to query parsing.
if has_stream_objects:
logger.debug("Found matching stream object")
return ObservedQuery(
query=res["query_text"],
session_id=res["session_id"],
timestamp=res["query_start_time"].astimezone(timezone.utc),
user=user,
default_db=res["default_db"],
default_schema=res["default_schema"],
query_hash=get_query_fingerprint(
res["query_text"], self.identifiers.platform, fast=True
),
)

upstreams = []
column_usage = {}

Expand Down Expand Up @@ -460,12 +501,6 @@ def _parse_audit_log_row(
)
)

user = CorpUserUrn(
self.identifiers.get_user_identifier(
res["user_name"], users.get(res["user_name"])
)
)

timestamp: datetime = res["query_start_time"]
timestamp = timestamp.astimezone(timezone.utc)

Expand Down
Loading

0 comments on commit 6944d6f

Please sign in to comment.