diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index e6319f668ecb8c..692da5db731bfd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from datetime import datetime from functools import lru_cache +from itertools import groupby from typing import ( Any, Dict, @@ -22,6 +23,7 @@ from sqlalchemy.engine import Engine from sqlalchemy.engine.base import Connection from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.pool import QueuePool from sqlalchemy.sql.expression import text from teradatasqlalchemy.dialect import TeradataDialect from teradatasqlalchemy.options import configure @@ -58,7 +60,6 @@ from datahub.metadata.schema_classes import SchemaMetadataClass from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage -from datahub.utilities.groupby import groupby_unsorted logger: logging.Logger = logging.getLogger(__name__) @@ -286,7 +287,7 @@ def grouper(fk_row): # TODO: Check if there's a better way fk_dicts = list() - for constraint_info, constraint_cols in groupby_unsorted(res, grouper): + for constraint_info, constraint_cols in groupby(res, grouper): fk_dict = { "name": str(constraint_info["name"]), "constrained_columns": list(), @@ -599,12 +600,7 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): setattr( # noqa: B010 TeradataDialect, "get_columns", - lambda self, - connection, - table_name, - schema=None, - use_qvci=self.config.use_qvci, - **kw: optimized_get_columns( + lambda self, connection, table_name, schema=None, use_qvci=self.config.use_qvci, **kw: optimized_get_columns( self, connection, table_name, @@ -618,11 +614,7 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): setattr( # noqa: B010 TeradataDialect, "get_pk_constraint", - lambda self, - connection, - table_name, - schema=None, - **kw: optimized_get_pk_constraint( + lambda self, connection, table_name, schema=None, **kw: optimized_get_pk_constraint( self, connection, table_name, schema, **kw ), ) @@ -630,11 +622,7 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): setattr( # noqa: B010 TeradataDialect, "get_foreign_keys", - lambda self, - connection, - table_name, - schema=None, - **kw: optimized_get_foreign_keys( + lambda self, connection, table_name, schema=None, **kw: optimized_get_foreign_keys( self, connection, table_name, schema, **kw ), ) @@ -705,6 +693,12 @@ def get_inspectors(self): # This method can be overridden in the case that you want to dynamically # run on multiple databases. url = self.config.get_sql_alchemy_url() + + # replace pooling with pooling for teradata + if "max_overflow" in self.config.options: + self.config.options.pop("max_overflow") + self.config.options["poolclass"] = QueuePool + logger.debug(f"sql_alchemy_url={url}") engine = create_engine(url, **self.config.options) with engine.connect() as conn: