Skip to content

Commit

Permalink
Merge branch 'master' into snowflake-streams-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
brock-acryl authored Jan 24, 2025
2 parents 5199366 + 0f538d8 commit 150b12a
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export default function DataProductsTab() {
searchFlags: { skipCache: true },
},
},
fetchPolicy: 'no-cache',
});
const totalResults = data?.searchAcrossEntities?.total || 0;
const searchResults = data?.searchAcrossEntities?.searchResults?.map((r) => r.entity) || [];
Expand Down
2 changes: 1 addition & 1 deletion docs/actions/sources/datahub-cloud-event-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ If you've configured your Action pipeline `failure_mode` to be `THROW`, then eve

The DataHub Cloud Event Source produces

- [Entity Change Event V1](../events/entity-change-event.md)
- [Entity Change Event V1](../../managed-datahub/datahub-api/entity-events-api.md)

Note that the DataHub Cloud Event Source does _not_ yet support the full [Metadata Change Log V1](../events/metadata-change-log-event.md) event stream.

Expand Down
1 change: 1 addition & 0 deletions docs/managed-datahub/datahub-api/entity-events-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The Events API allows you to integrate changes happening on the DataHub Metadata
### Supported Integrations

* [AWS EventBridge](docs/managed-datahub/operator-guide/setting-up-events-api-on-aws-eventbridge.md)
* [DataHub Cloud Event Source](docs/actions/sources/datahub-cloud-event-source.md)

### Use Cases

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,15 +541,25 @@ def fetch_secure_view_definition(
try:
view_definitions = self.data_dictionary.get_secure_view_definitions()
return view_definitions[db_name][schema_name][table_name]
except KeyError:
# Received secure view definitions but the view is not present in results
self.structured_reporter.info(
title="Secure view definition not found",
message="Lineage will be missing for the view.",
context=f"{db_name}.{schema_name}.{table_name}",
)
return None
except Exception as e:
if isinstance(e, SnowflakePermissionError):
error_msg = (
"Failed to get secure views definitions. Please check permissions."
)
else:
error_msg = "Failed to get secure views definitions"
action_msg = (
"Please check permissions."
if isinstance(e, SnowflakePermissionError)
else ""
)

self.structured_reporter.warning(
error_msg,
title="Failed to get secure views definitions",
message=f"Lineage will be missing for the view. {action_msg}",
context=f"{db_name}.{schema_name}.{table_name}",
exc=e,
)
return None
Expand Down
48 changes: 5 additions & 43 deletions metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
)
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
DatasetPropertiesClass,
DatasetSnapshotClass,
UpstreamClass,
)
Expand Down Expand Up @@ -418,41 +417,11 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit
dataset_snapshot: DatasetSnapshotClass = wu.metadata.proposedSnapshot
assert dataset_snapshot

lineage_mcp, lineage_properties_aspect = self.get_lineage_mcp(
wu.metadata.proposedSnapshot.urn
)
lineage_mcp = self.get_lineage_mcp(wu.metadata.proposedSnapshot.urn)

if lineage_mcp is not None:
yield lineage_mcp.as_workunit()

if lineage_properties_aspect:
aspects = dataset_snapshot.aspects
if aspects is None:
aspects = []

dataset_properties_aspect: Optional[DatasetPropertiesClass] = None

for aspect in aspects:
if isinstance(aspect, DatasetPropertiesClass):
dataset_properties_aspect = aspect

if dataset_properties_aspect is None:
dataset_properties_aspect = DatasetPropertiesClass()
aspects.append(dataset_properties_aspect)

custom_properties = (
{
**dataset_properties_aspect.customProperties,
**lineage_properties_aspect.customProperties,
}
if dataset_properties_aspect.customProperties
else lineage_properties_aspect.customProperties
)
dataset_properties_aspect.customProperties = custom_properties
dataset_snapshot.aspects = aspects

dataset_snapshot.aspects.append(dataset_properties_aspect)

# Emit the work unit from super.
yield wu

Expand Down Expand Up @@ -656,19 +625,16 @@ def _populate_lineage(self) -> None:

def get_lineage_mcp(
self, dataset_urn: str
) -> Tuple[
Optional[MetadataChangeProposalWrapper], Optional[DatasetPropertiesClass]
]:
) -> Optional[MetadataChangeProposalWrapper]:
dataset_key = mce_builder.dataset_urn_to_key(dataset_urn)
if dataset_key is None:
return None, None
return None

if not self._lineage_map:
self._populate_lineage()
assert self._lineage_map is not None

upstream_lineage: List[UpstreamClass] = []
custom_properties: Dict[str, str] = {}

if dataset_key.name in self._lineage_map:
item = self._lineage_map[dataset_key.name]
Expand All @@ -684,16 +650,12 @@ def get_lineage_mcp(
)
upstream_lineage.append(upstream_table)

properties = None
if custom_properties:
properties = DatasetPropertiesClass(customProperties=custom_properties)

if not upstream_lineage:
return None, properties
return None

mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=UpstreamLineage(upstreams=upstream_lineage),
)

return mcp, properties
return mcp
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/upgrade/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ async def get_github_stats():
async with aiohttp.ClientSession(
headers={"Accept": "application/vnd.github.v3+json"}
) as session:
gh_url = "https://api.github.com/repos/datahub-project/datahub/releases"
gh_url = "https://api.github.com/repos/datahub-project/datahub/releases/latest"
async with session.get(gh_url) as gh_response:
gh_response_json = await gh_response.json()
latest_server_version = Version(gh_response_json[0].get("tag_name"))
latest_server_date = gh_response_json[0].get("published_at")
latest_server_version = Version(gh_response_json.get("tag_name"))
latest_server_date = gh_response_json.get("published_at")
return (latest_server_version, latest_server_date)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,63 @@ def test_snowflake_missing_snowflake_operations_permission_causes_pipeline_failu
assert "usage-permission-error" in [
failure.message for failure in pipeline.source.get_report().failures
]


@freeze_time(FROZEN_TIME)
def test_snowflake_missing_snowflake_secure_view_definitions_raises_pipeline_info(
pytestconfig,
snowflake_pipeline_config,
):
with mock.patch("snowflake.connector.connect") as mock_connect:
sf_connection = mock.MagicMock()
sf_cursor = mock.MagicMock()
mock_connect.return_value = sf_connection
sf_connection.cursor.return_value = sf_cursor

# Empty secure view definitions
sf_cursor.execute.side_effect = query_permission_response_override(
default_query_results,
[snowflake_query.SnowflakeQuery.get_secure_view_definitions()],
[],
)
pipeline = Pipeline(snowflake_pipeline_config)
pipeline.run()

pipeline.raise_from_status(raise_warnings=True)
assert pipeline.source.get_report().infos.as_obj() == [
{
"title": "Secure view definition not found",
"message": "Lineage will be missing for the view.",
"context": ["TEST_DB.TEST_SCHEMA.VIEW_1"],
}
]


@freeze_time(FROZEN_TIME)
def test_snowflake_failed_secure_view_definitions_query_raises_pipeline_warning(
pytestconfig,
snowflake_pipeline_config,
):
with mock.patch("snowflake.connector.connect") as mock_connect:
sf_connection = mock.MagicMock()
sf_cursor = mock.MagicMock()
mock_connect.return_value = sf_connection
sf_connection.cursor.return_value = sf_cursor

# Error in getting secure view definitions
sf_cursor.execute.side_effect = query_permission_error_override(
default_query_results,
[snowflake_query.SnowflakeQuery.get_secure_view_definitions()],
"Database 'SNOWFLAKE' does not exist or not authorized.",
)
pipeline = Pipeline(snowflake_pipeline_config)
pipeline.run()
assert pipeline.source.get_report().warnings.as_obj() == [
{
"title": "Failed to get secure views definitions",
"message": "Lineage will be missing for the view. Please check permissions.",
"context": [
"TEST_DB.TEST_SCHEMA.VIEW_1 <class 'datahub.ingestion.source.snowflake.snowflake_connection.SnowflakePermissionError'>: Database 'SNOWFLAKE' does not exist or not authorized."
],
}
]

0 comments on commit 150b12a

Please sign in to comment.