Skip to content

Commit

Permalink
Update teradata.py
Browse files Browse the repository at this point in the history
Teradata sqlalchemy connector does not support max_overflow. replaced with poolclass=QueuePool.
  • Loading branch information
brock-acryl committed Jan 30, 2025
1 parent a155470 commit 5a6266f
Showing 1 changed file with 12 additions and 18 deletions.
30 changes: 12 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dataclasses import dataclass
from datetime import datetime
from functools import lru_cache
from itertools import groupby

Check warning on line 6 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L6

Added line #L6 was not covered by tests
from typing import (
Any,
Dict,
Expand All @@ -22,6 +23,7 @@
from sqlalchemy.engine import Engine
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.pool import QueuePool

Check warning on line 26 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L26

Added line #L26 was not covered by tests
from sqlalchemy.sql.expression import text
from teradatasqlalchemy.dialect import TeradataDialect
from teradatasqlalchemy.options import configure
Expand Down Expand Up @@ -58,7 +60,6 @@
from datahub.metadata.schema_classes import SchemaMetadataClass
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage
from datahub.utilities.groupby import groupby_unsorted

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -286,7 +287,7 @@ def grouper(fk_row):

# TODO: Check if there's a better way
fk_dicts = list()
for constraint_info, constraint_cols in groupby_unsorted(res, grouper):
for constraint_info, constraint_cols in groupby(res, grouper):

Check warning on line 290 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L290

Added line #L290 was not covered by tests
fk_dict = {
"name": str(constraint_info["name"]),
"constrained_columns": list(),
Expand Down Expand Up @@ -599,12 +600,7 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext):
setattr( # noqa: B010
TeradataDialect,
"get_columns",
lambda self,
connection,
table_name,
schema=None,
use_qvci=self.config.use_qvci,
**kw: optimized_get_columns(
lambda self, connection, table_name, schema=None, use_qvci=self.config.use_qvci, **kw: optimized_get_columns(
self,
connection,
table_name,
Expand All @@ -618,23 +614,15 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext):
setattr( # noqa: B010
TeradataDialect,
"get_pk_constraint",
lambda self,
connection,
table_name,
schema=None,
**kw: optimized_get_pk_constraint(
lambda self, connection, table_name, schema=None, **kw: optimized_get_pk_constraint(
self, connection, table_name, schema, **kw
),
)

setattr( # noqa: B010
TeradataDialect,
"get_foreign_keys",
lambda self,
connection,
table_name,
schema=None,
**kw: optimized_get_foreign_keys(
lambda self, connection, table_name, schema=None, **kw: optimized_get_foreign_keys(
self, connection, table_name, schema, **kw
),
)
Expand Down Expand Up @@ -705,6 +693,12 @@ def get_inspectors(self):
# This method can be overridden in the case that you want to dynamically
# run on multiple databases.
url = self.config.get_sql_alchemy_url()

# replace pooling with pooling for teradata
if "max_overflow" in self.config.options:
self.config.options.pop("max_overflow")
self.config.options["poolclass"] = QueuePool

Check warning on line 700 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L698-L700

Added lines #L698 - L700 were not covered by tests

logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
with engine.connect() as conn:
Expand Down

0 comments on commit 5a6266f

Please sign in to comment.