Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/teradata): teradata profiling fix for pooling #12507

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,15 @@ def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str)
)
self.report.sql_aggregator = self.aggregator.report

def _add_default_options(self, sql_config: SQLCommonConfig) -> None:
"""Add default SQLAlchemy options. Can be overridden by subclasses to add additional defaults."""
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if sql_config.is_profiling_enabled():
sql_config.options.setdefault(
"max_overflow", sql_config.profiling.max_workers
)

@classmethod
def test_connection(cls, config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
Expand Down Expand Up @@ -519,12 +528,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit
# Known issue with sqlalchemy https://stackoverflow.com/questions/60804288/pycharm-duplicated-log-for-sqlalchemy-echo-true
sqlalchemy_log._add_default_handler = lambda x: None # type: ignore

# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if sql_config.is_profiling_enabled():
sql_config.options.setdefault(
"max_overflow", sql_config.profiling.max_workers
)
self._add_default_options(sql_config)

for inspector in self.get_inspectors():
profiler = None
Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from sqlalchemy.engine import Engine
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.pool import QueuePool

Check warning on line 25 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L25

Added line #L25 was not covered by tests
from sqlalchemy.sql.expression import text
from teradatasqlalchemy.dialect import TeradataDialect
from teradatasqlalchemy.options import configure
Expand Down Expand Up @@ -678,6 +679,16 @@
if self.config.stateful_ingestion:
self.config.stateful_ingestion.remove_stale_metadata = False

def _add_default_options(self, sql_config: SQLCommonConfig) -> None:

Check warning on line 682 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L682

Added line #L682 was not covered by tests
"""Add Teradata-specific default options"""
super()._add_default_options(sql_config)
if sql_config.is_profiling_enabled():

Check warning on line 685 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L684-L685

Added lines #L684 - L685 were not covered by tests
# Sqlalchemy uses QueuePool by default however Teradata uses SingletonThreadPool.
# SingletonThreadPool does not support parellel connections. For using profiling, we need to use QueuePool.
# https://docs.sqlalchemy.org/en/20/core/pooling.html#connection-pool-configuration
# https://github.com/Teradata/sqlalchemy-teradata/issues/96
sql_config.options.setdefault("poolclass", QueuePool)

Check warning on line 690 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L690

Added line #L690 was not covered by tests
brock-acryl marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def create(cls, config_dict, ctx):
config = TeradataConfig.parse_obj(config_dict)
Expand Down Expand Up @@ -705,6 +716,7 @@
# This method can be overridden in the case that you want to dynamically
# run on multiple databases.
url = self.config.get_sql_alchemy_url()

logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
with engine.connect() as conn:
Expand Down
Loading