Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/snowflake): Create all structured propery templates before assignation #12469

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ class SnowflakeV2Config(
description="If enabled along with `extract_tags`, extracts snowflake's key-value tags as DataHub structured properties instead of DataHub tags.",
)

structured_properties_template_cache_invalidation_interval: int = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe hide this one from docs, feels more like an implementation detail

hidden_from_docs=True,
default=60,
description="Interval in seconds to invalidate the structured properties template cache.",
)

include_external_url: bool = Field(
default=True,
description="Whether to populate Snowsight url for Snowflake Objects",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@
and table_type in ('BASE TABLE', 'EXTERNAL TABLE', 'HYBRID TABLE')
order by table_schema, table_name"""

@staticmethod
def get_all_tags():
return """

Check warning on line 164 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py#L164

Added line #L164 was not covered by tests
SELECT tag_database as "TAG_DATABASE",
tag_schema AS "TAG_SCHEMA",
tag_name AS "TAG_NAME",
FROM snowflake.account_usage.tag_references
GROUP BY TAG_DATABASE , TAG_SCHEMA, tag_name
ORDER BY TAG_DATABASE, TAG_SCHEMA, TAG_NAME ASC;
"""

@staticmethod
def get_all_tags_on_object_with_propagation(
db_name: str, quoted_identifier: str, domain: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class SnowflakeV2Report(
num_tables_with_known_upstreams: int = 0
num_upstream_lineage_edge_parsing_failed: int = 0
num_secure_views_missing_definition: int = 0
num_structured_property_templates_created: int = 0

data_dictionary_cache: Optional["SnowflakeDataDictionary"] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,23 @@

return secure_view_definitions

def get_all_tags(self) -> List[SnowflakeTag]:
cur = self.connection.query(

Check warning on line 289 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L289

Added line #L289 was not covered by tests
SnowflakeQuery.get_all_tags(),
)

tags = [

Check warning on line 293 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L293

Added line #L293 was not covered by tests
SnowflakeTag(
database=tag["TAG_DATABASE"],
schema=tag["TAG_SCHEMA"],
name=tag["TAG_NAME"],
value="",
)
for tag in cur
]

return tags

Check warning on line 303 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L303

Added line #L303 was not covered by tests

@serialized_lru_cache(maxsize=1)
def get_tables_for_database(
self, db_name: str
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import itertools
import logging
import time
from typing import Dict, Iterable, List, Optional, Union

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import (
get_sys_time,
make_data_platform_urn,
make_dataset_urn_with_platform_instance,
make_schema_field_urn,
Expand Down Expand Up @@ -74,7 +74,6 @@
PROFILING,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
GlobalTags,
Status,
SubTypes,
Expand All @@ -101,15 +100,8 @@
StringType,
TimeType,
)
from datahub.metadata.com.linkedin.pegasus2avro.structured import (
StructuredPropertyDefinition,
)
from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties
from datahub.metadata.urns import (
ContainerUrn,
DatasetUrn,
DataTypeUrn,
EntityTypeUrn,
SchemaFieldUrn,
StructuredPropertyUrn,
)
Expand Down Expand Up @@ -191,7 +183,7 @@
self.domain_registry: Optional[DomainRegistry] = domain_registry
self.classification_handler = ClassificationHandler(self.config, self.report)
self.tag_extractor = SnowflakeTagExtractor(
config, self.data_dictionary, self.report
config, self.data_dictionary, self.report, identifiers
)
self.profiler: Optional[SnowflakeProfiler] = profiler
self.snowsight_url_builder: Optional[SnowsightUrlBuilder] = (
Expand All @@ -217,6 +209,16 @@
return self.identifiers.snowflake_identifier(identifier)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.config.extract_tags_as_structured_properties:
logger.info("Creating structured property templates for tags")
yield from self.tag_extractor.create_structured_property_templates()

Check warning on line 214 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py#L212-L214

Added lines #L212 - L214 were not covered by tests
# We have to wait until cache invalidates to make sure the structured property template is available
logger.info(

Check warning on line 216 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py#L216

Added line #L216 was not covered by tests
f"Waiting for {self.config.structured_properties_template_cache_invalidation_interval} seconds for structured properties cache to invalidate"
)
time.sleep(

Check warning on line 219 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py#L219

Added line #L219 was not covered by tests
self.config.structured_properties_template_cache_invalidation_interval
)
self.databases = []
for database in self.get_databases() or []:
self.report.report_entity_scanned(database.name, "database")
Expand Down Expand Up @@ -698,6 +700,7 @@

def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
use_sp = self.config.extract_tags_as_structured_properties

identifier = (
self.snowflake_identifier(tag.structured_property_identifier())
if use_sp
Expand All @@ -708,10 +711,11 @@
return

self.report.report_tag_processed(identifier)

if use_sp:
yield from self.gen_tag_as_structured_property_workunits(tag)
else:
yield from self.gen_tag_workunits(tag)
return

Check warning on line 716 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py#L716

Added line #L716 was not covered by tests

yield from self.gen_tag_workunits(tag)

Check warning on line 718 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py#L718

Added line #L718 was not covered by tests

def _format_tags_as_structured_properties(
self, tags: List[SnowflakeTag]
Expand All @@ -732,6 +736,7 @@
if table.tags:
for tag in table.tags:
yield from self._process_tag(tag)

for column_name in table.column_tags:
for tag in table.column_tags[column_name]:
yield from self._process_tag(tag)
Expand Down Expand Up @@ -903,29 +908,6 @@
entityUrn=tag_urn, aspect=tag_properties_aspect
).as_workunit()

def gen_tag_as_structured_property_workunits(
self, tag: SnowflakeTag
) -> Iterable[MetadataWorkUnit]:
identifier = self.snowflake_identifier(tag.structured_property_identifier())
urn = StructuredPropertyUrn(identifier).urn()
aspect = StructuredPropertyDefinition(
qualifiedName=identifier,
displayName=tag.name,
valueType=DataTypeUrn("datahub.string").urn(),
entityTypes=[
EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(),
],
lastModified=AuditStamp(
time=get_sys_time(), actor="urn:li:corpuser:datahub"
),
)
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=aspect,
).as_workunit()

def gen_column_tags_as_structured_properties(
self, dataset_urn: str, table: Union[SnowflakeTable, SnowflakeView]
) -> Iterable[MetadataWorkUnit]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
from typing import Dict, List, Optional
from typing import Dict, Iterable, List, Optional

from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeV2Config,
Expand All @@ -12,7 +15,22 @@
SnowflakeTag,
_SnowflakeTagCache,
)
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeCommonMixin,
SnowflakeIdentifierBuilder,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
from datahub.metadata.com.linkedin.pegasus2avro.structured import (
StructuredPropertyDefinition,
)
from datahub.metadata.urns import (
ContainerUrn,
DatasetUrn,
DataTypeUrn,
EntityTypeUrn,
SchemaFieldUrn,
StructuredPropertyUrn,
)

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -23,11 +41,12 @@
config: SnowflakeV2Config,
data_dictionary: SnowflakeDataDictionary,
report: SnowflakeV2Report,
snowflake_identifiers: SnowflakeIdentifierBuilder,
) -> None:
self.config = config
self.data_dictionary = data_dictionary
self.report = report

self.snowflake_identifiers = snowflake_identifiers

Check warning on line 49 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py#L49

Added line #L49 was not covered by tests
self.tag_cache: Dict[str, _SnowflakeTagCache] = {}

def _get_tags_on_object_without_propagation(
Expand Down Expand Up @@ -59,6 +78,41 @@
raise ValueError(f"Unknown domain {domain}")
return tags

def create_structured_property_templates(self) -> Iterable[MetadataWorkUnit]:
for tag in self.data_dictionary.get_all_tags():
if not self.config.structured_property_pattern.allowed(

Check warning on line 83 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py#L82-L83

Added lines #L82 - L83 were not covered by tests
tag.tag_identifier()
):
continue
if self.config.extract_tags_as_structured_properties:
self.report.num_structured_property_templates_created += 1
yield from self.gen_tag_as_structured_property_workunits(tag)

Check warning on line 89 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py#L86-L89

Added lines #L86 - L89 were not covered by tests

def gen_tag_as_structured_property_workunits(
self, tag: SnowflakeTag
) -> Iterable[MetadataWorkUnit]:
identifier = self.snowflake_identifiers.snowflake_identifier(

Check warning on line 94 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py#L94

Added line #L94 was not covered by tests
tag.structured_property_identifier()
)
urn = StructuredPropertyUrn(identifier).urn()
aspect = StructuredPropertyDefinition(

Check warning on line 98 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py#L97-L98

Added lines #L97 - L98 were not covered by tests
qualifiedName=identifier,
displayName=tag.name,
valueType=DataTypeUrn("datahub.string").urn(),
entityTypes=[
EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(),
],
lastModified=AuditStamp(
time=get_sys_time(), actor="urn:li:corpuser:datahub"
),
)
yield MetadataChangeProposalWrapper(

Check warning on line 111 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py#L111

Added line #L111 was not covered by tests
entityUrn=urn,
aspect=aspect,
).as_workunit()

def _get_tags_on_object_with_propagation(
self,
domain: str,
Expand Down
22 changes: 21 additions & 1 deletion metadata-ingestion/tests/integration/snowflake/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,27 @@ def default_query_results( # noqa: C901
),
]:
return []

elif query == snowflake_query.SnowflakeQuery.get_all_tags():
return [
*[
{
"TAG_DATABASE": "TEST_DB",
"TAG_SCHEMA": "TEST_SCHEMA",
"TAG_NAME": f"my_tag_{ix}",
}
for ix in range(3)
],
{
"TAG_DATABASE": "TEST_DB",
"TAG_SCHEMA": "TEST_SCHEMA",
"TAG_NAME": "security",
},
{
"TAG_DATABASE": "OTHER_DB",
"TAG_SCHEMA": "OTHER_SCHEMA",
"TAG_NAME": "my_other_tag",
},
]
elif (
query
== snowflake_query.SnowflakeQuery.get_all_tags_in_database_without_propagation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def test_snowflake_tags_as_structured_properties(
include_column_lineage=False,
include_usage_stats=False,
include_operational_stats=False,
structured_properties_template_cache_invalidation_interval=0,
),
),
sink=DynamicTypedConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def test_snowflake_structured_property_pattern_deny():
match_fully_qualified_names=True,
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
extract_tags_as_structured_properties=True,
structured_properties_template_cache_invalidation_interval=0,
tag_pattern=AllowDenyPattern(
deny=["TEST_DB.TEST_SCHEMA.my_tag_2:my_value_2"]
),
Expand Down Expand Up @@ -142,7 +143,7 @@ def test_snowflake_structured_property_pattern_deny():
source_report = pipeline.source.get_report()
assert isinstance(source_report, SnowflakeV2Report)
assert source_report.tags_scanned == 5
assert source_report._processed_tags == {
assert sorted(list(source_report._processed_tags)) == [
"snowflake.other_db.other_schema.my_other_tag",
"snowflake.test_db.test_schema.security",
}
]
Loading