Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snowflake): set is_temp_table and is_allowed_table function for SqlParsingAggregator in SnowflakeV2Source #12438

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import os.path
import platform
import re
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Union

Expand Down Expand Up @@ -33,6 +34,7 @@
from datahub.ingestion.source.snowflake.constants import (
GENERIC_PERMISSION_ERROR_KEY,
SnowflakeEdition,
SnowflakeObjectDomain,
)
from datahub.ingestion.source.snowflake.snowflake_assertion import (
SnowflakeAssertionsHandler,
Expand Down Expand Up @@ -162,6 +164,8 @@
self.data_dictionary = SnowflakeDataDictionary(connection=self.connection)
self.lineage_extractor: Optional[SnowflakeLineageExtractor] = None

self.discovered_datasets: Optional[List[str]] = None

self.aggregator: SqlParsingAggregator = self._exit_stack.enter_context(
SqlParsingAggregator(
platform=self.identifiers.platform,
Expand All @@ -182,6 +186,8 @@
generate_usage_statistics=False,
generate_operations=False,
format_queries=self.config.format_sql_queries,
is_temp_table=self._is_temp_table,
is_allowed_table=self._is_allowed_table,
)
)
self.report.sql_aggregator = self.aggregator.report
Expand Down Expand Up @@ -444,6 +450,34 @@

return _report

def _is_temp_table(self, name: str) -> bool:
if any(
re.match(pattern, name, flags=re.IGNORECASE)
for pattern in self.config.temporary_tables_pattern
):
return True

Check warning on line 458 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py#L458

Added line #L458 was not covered by tests

# This is also a temp table if
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe note that this logic mirrors the logic from snowflake_queries.py?

not sure if there's a way to remove the code duplication, but that'd be even better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@sgomezvillamor sgomezvillamor Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing that comes to my mind would be to give the responsibility of instantiating SqlParsingAggregator to the SnowflakeV2Source itself, so SnowflakeQueriesExtractor gets it as constructor parameter.
This way we still instantiate SqlParsingAggregator twice but is_temp_table and is_allowed_table would be defined once in the SnowflakeV2Source
WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time trying this, and it's getting complicated, so I'm leaving it as is.

# 1. this name would be allowed by the dataset patterns, and
# 2. we have a list of discovered tables, and
# 3. it's not in the discovered tables list
if (
self.filters.is_dataset_pattern_allowed(name, SnowflakeObjectDomain.TABLE)
and self.discovered_datasets
and name not in self.discovered_datasets
):
return True

Check warning on line 469 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py#L469

Added line #L469 was not covered by tests

return False

def _is_allowed_table(self, name: str) -> bool:
if self.discovered_datasets and name not in self.discovered_datasets:
return False

Check warning on line 475 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py#L475

Added line #L475 was not covered by tests

return self.filters.is_dataset_pattern_allowed(
name, SnowflakeObjectDomain.TABLE
)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
Expand Down Expand Up @@ -513,7 +547,7 @@
)
return

discovered_datasets = discovered_tables + discovered_views
self.discovered_datasets = discovered_tables + discovered_views

if self.config.use_queries_v2:
with self.report.new_stage(f"*: {VIEW_PARSING}"):
Expand All @@ -538,13 +572,14 @@
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=schema_resolver,
discovered_tables=discovered_datasets,
discovered_tables=self.discovered_datasets,
graph=self.ctx.graph,
)

# TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs
# but a shared schema resolver. That's fine for now though - once we remove the old lineage/usage extractors,
# it should be pretty straightforward to refactor this and only initialize the aggregator once.
# This also applies for the _is_temp_table and _is_allowed_table methods above, duplicated from SnowflakeQueriesExtractor.
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
queries_extractor.close()
Expand All @@ -568,12 +603,14 @@
if (
self.config.include_usage_stats or self.config.include_operational_stats
) and self.usage_extractor:
yield from self.usage_extractor.get_usage_workunits(discovered_datasets)
yield from self.usage_extractor.get_usage_workunits(
self.discovered_datasets
)

if self.config.include_assertion_results:
yield from SnowflakeAssertionsHandler(
self.config, self.report, self.connection, self.identifiers
).get_assertion_workunits(discovered_datasets)
).get_assertion_workunits(self.discovered_datasets)

self.connection.close()

Expand Down
Loading