Skip to content

Commit

Permalink
fix(mssql): adds missing containers and browsepathsv2 for dataflow an…
Browse files Browse the repository at this point in the history
…d datajob (#12483)
  • Loading branch information
sgomezvillamor authored Jan 31, 2025
1 parent 301d628 commit a7598ca
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
make_data_platform_urn,
make_dataplatform_instance_urn,
)
from datahub.emitter.mcp_builder import (
DatabaseKey,
SchemaKey,
)
from datahub.metadata.schema_classes import (
ContainerClass,
DataFlowInfoClass,
DataJobInfoClass,
DataJobInputOutputClass,
Expand Down Expand Up @@ -171,11 +176,7 @@ def urn(self) -> str:
flow_id=self.entity.flow.formatted_name,
job_id=self.entity.formatted_name,
cluster=self.entity.flow.cluster,
platform_instance=(
self.entity.flow.platform_instance
if self.entity.flow.platform_instance
else None
),
platform_instance=self.entity.flow.platform_instance,
)

def add_property(
Expand Down Expand Up @@ -222,6 +223,26 @@ def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClas
)
return None

@property
def as_container_aspect(self) -> ContainerClass:
key_args = dict(
platform=self.entity.flow.orchestrator,
instance=self.entity.flow.platform_instance,
env=self.entity.flow.env,
database=self.entity.flow.db,
)
container_key = (
SchemaKey(
schema=self.entity.schema,
**key_args,
)
if isinstance(self.entity, StoredProcedure)
else DatabaseKey(
**key_args,
)
)
return ContainerClass(container=container_key.as_urn())


@dataclass
class MSSQLDataFlow:
Expand All @@ -244,9 +265,7 @@ def urn(self) -> str:
orchestrator=self.entity.orchestrator,
flow_id=self.entity.formatted_name,
cluster=self.entity.cluster,
platform_instance=(
self.entity.platform_instance if self.entity.platform_instance else None
),
platform_instance=self.entity.platform_instance,
)

@property
Expand All @@ -267,3 +286,13 @@ def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClas
),
)
return None

@property
def as_container_aspect(self) -> ContainerClass:
databaseKey = DatabaseKey(
platform=self.entity.orchestrator,
instance=self.entity.platform_instance,
env=self.entity.env,
database=self.entity.db,
)
return ContainerClass(container=databaseKey.as_urn())
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ class SQLServerConfig(BasicSQLAlchemyConfig):
default=True,
description="Enable lineage extraction for stored procedures",
)
include_containers_for_pipelines: bool = Field(
default=False,
description="Enable the container aspects ingestion for both pipelines and tasks. Note that this feature requires the corresponding model support in the backend, which was introduced in version 0.15.0.1.",
)

@pydantic.validator("uri_args")
def passwords_match(cls, v, values, **kwargs):
Expand Down Expand Up @@ -641,6 +645,12 @@ def construct_job_workunits(
aspect=data_platform_instance_aspect,
).as_workunit()

if self.config.include_containers_for_pipelines:
yield MetadataChangeProposalWrapper(
entityUrn=data_job.urn,
aspect=data_job.as_container_aspect,
).as_workunit()

if include_lineage:
yield MetadataChangeProposalWrapper(
entityUrn=data_job.urn,
Expand Down Expand Up @@ -683,6 +693,13 @@ def construct_flow_workunits(
entityUrn=data_flow.urn,
aspect=data_platform_instance_aspect,
).as_workunit()

if self.config.include_containers_for_pipelines:
yield MetadataChangeProposalWrapper(
entityUrn=data_flow.urn,
aspect=data_flow.as_container_aspect,
).as_workunit()

# TODO: Add SubType when it appear

def get_inspectors(self) -> Iterable[Inspector]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@
"aspect": {
"json": {
"customProperties": {
"job_id": "f5a6c120-500a-4300-9b21-0c3225af1f80",
"job_id": "2fc72675-0c68-4260-ab00-c361b96c8c36",
"job_name": "Weekly Demo Data Backup",
"description": "No description available.",
"date_created": "2024-12-30 19:59:24.690000",
"date_modified": "2024-12-30 19:59:24.690000",
"date_created": "2025-01-31 08:02:41.167000",
"date_modified": "2025-01-31 08:02:41.360000",
"step_id": "1",
"step_name": "Set database to read only",
"subsystem": "TSQL",
Expand Down Expand Up @@ -2279,8 +2279,8 @@
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"input parameters": "['@ID']",
"parameter @ID": "{'type': 'int'}",
"date_created": "2024-12-30 19:59:24.690000",
"date_modified": "2024-12-30 19:59:24.690000"
"date_created": "2025-01-31 08:02:40.980000",
"date_modified": "2025-01-31 08:02:40.980000"
},
"name": "DemoData.Foo.Proc.With.SpecialChar",
"type": {
Expand Down Expand Up @@ -2329,8 +2329,8 @@
"depending_on_procedure": "{}",
"code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n",
"input parameters": "[]",
"date_created": "2024-12-30 19:59:24.690000",
"date_modified": "2024-12-30 19:59:24.690000"
"date_created": "2025-01-31 08:02:40.987000",
"date_modified": "2025-01-31 08:02:40.987000"
},
"name": "DemoData.Foo.NewProc",
"type": {
Expand Down Expand Up @@ -4969,7 +4969,7 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1735588784503,
"time": 1738310563767,
"actor": "urn:li:corpuser:_ingestion"
}
}
Expand Down Expand Up @@ -5092,7 +5092,7 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1735588784511,
"time": 1738310563770,
"actor": "urn:li:corpuser:_ingestion"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@
"aspect": {
"json": {
"customProperties": {
"job_id": "f5a6c120-500a-4300-9b21-0c3225af1f80",
"job_id": "2fc72675-0c68-4260-ab00-c361b96c8c36",
"job_name": "Weekly Demo Data Backup",
"description": "No description available.",
"date_created": "2024-12-30 19:59:24.690000",
"date_modified": "2024-12-30 19:59:24.690000",
"date_created": "2025-01-31 08:02:41.167000",
"date_modified": "2025-01-31 08:02:41.360000",
"step_id": "1",
"step_name": "Set database to read only",
"subsystem": "TSQL",
Expand Down Expand Up @@ -2279,8 +2279,8 @@
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"input parameters": "['@ID']",
"parameter @ID": "{'type': 'int'}",
"date_created": "2024-12-30 19:59:24.690000",
"date_modified": "2024-12-30 19:59:24.690000"
"date_created": "2025-01-31 08:02:40.980000",
"date_modified": "2025-01-31 08:02:40.980000"
},
"name": "DemoData.Foo.Proc.With.SpecialChar",
"type": {
Expand Down Expand Up @@ -2694,7 +2694,7 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1735588789629,
"time": 1738310565884,
"actor": "urn:li:corpuser:_ingestion"
}
}
Expand Down
Loading

0 comments on commit a7598ca

Please sign in to comment.