diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index a0bd9ce0760bd1..5b1b9b1c2952c3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -352,6 +352,15 @@ def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str) ) self.report.sql_aggregator = self.aggregator.report + def _add_default_options(self, sql_config: SQLCommonConfig) -> None: + """Add default SQLAlchemy options. Can be overridden by subclasses to add additional defaults.""" + # Extra default SQLAlchemy option for better connection pooling and threading. + # https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow + if sql_config.is_profiling_enabled(): + sql_config.options.setdefault( + "max_overflow", sql_config.profiling.max_workers + ) + @classmethod def test_connection(cls, config_dict: dict) -> TestConnectionReport: test_report = TestConnectionReport() @@ -519,12 +528,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit # Known issue with sqlalchemy https://stackoverflow.com/questions/60804288/pycharm-duplicated-log-for-sqlalchemy-echo-true sqlalchemy_log._add_default_handler = lambda x: None # type: ignore - # Extra default SQLAlchemy option for better connection pooling and threading. - # https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow - if sql_config.is_profiling_enabled(): - sql_config.options.setdefault( - "max_overflow", sql_config.profiling.max_workers - ) + self._add_default_options(sql_config) for inspector in self.get_inspectors(): profiler = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index e6319f668ecb8c..c52eceb726955e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -22,6 +22,7 @@ from sqlalchemy.engine import Engine from sqlalchemy.engine.base import Connection from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.pool import QueuePool from sqlalchemy.sql.expression import text from teradatasqlalchemy.dialect import TeradataDialect from teradatasqlalchemy.options import configure @@ -678,6 +679,16 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): if self.config.stateful_ingestion: self.config.stateful_ingestion.remove_stale_metadata = False + def _add_default_options(self, sql_config: SQLCommonConfig) -> None: + """Add Teradata-specific default options""" + super()._add_default_options(sql_config) + if sql_config.is_profiling_enabled(): + # Sqlalchemy uses QueuePool by default however Teradata uses SingletonThreadPool. + # SingletonThreadPool does not support parellel connections. For using profiling, we need to use QueuePool. + # https://docs.sqlalchemy.org/en/20/core/pooling.html#connection-pool-configuration + # https://github.com/Teradata/sqlalchemy-teradata/issues/96 + sql_config.options.setdefault("poolclass", QueuePool) + @classmethod def create(cls, config_dict, ctx): config = TeradataConfig.parse_obj(config_dict) @@ -705,6 +716,7 @@ def get_inspectors(self): # This method can be overridden in the case that you want to dynamically # run on multiple databases. url = self.config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url={url}") engine = create_engine(url, **self.config.options) with engine.connect() as conn: