From bc09b28a649d591a45ec28008d3b6038ecb5fcd8 Mon Sep 17 00:00:00 2001 From: cccs-cat001 <56204545+cccs-cat001@users.noreply.github.com> Date: Thu, 30 Jan 2025 00:56:35 -0500 Subject: [PATCH] refactor(operation_config): change logging on is_profiling_enabled (#12416) Co-authored-by: Harshal Sheth --- metadata-ingestion/setup.py | 12 +++++++----- .../src/datahub/ingestion/source/sql/sql_config.py | 10 ---------- .../ingestion/source_config/operation_config.py | 9 +++++++++ 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index e603b5f6ac1d3..f7e6482fd26f8 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -251,6 +251,7 @@ # Iceberg Python SDK # Kept at 0.4.0 due to higher versions requiring pydantic>2, as soon as we are fine with it, bump this dependency "pyiceberg>=0.4.0", + *cachetools_lib, } mssql_common = { @@ -407,13 +408,14 @@ # UnsupportedProductError # https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/release-notes.html#rn-7-14-0 # https://github.com/elastic/elasticsearch-py/issues/1639#issuecomment-883587433 - "elasticsearch": {"elasticsearch==7.13.4"}, + "elasticsearch": {"elasticsearch==7.13.4", *cachetools_lib}, "cassandra": { "cassandra-driver>=3.28.0", # We were seeing an error like this `numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject` # with numpy 2.0. This likely indicates a mismatch between scikit-learn and numpy versions. # https://stackoverflow.com/questions/40845304/runtimewarning-numpy-dtype-size-changed-may-indicate-binary-incompatibility "numpy<2", + *cachetools_lib, }, "feast": { "feast>=0.34.0,<1", @@ -425,7 +427,7 @@ "numpy<2", }, "grafana": {"requests"}, - "glue": aws_common, + "glue": aws_common | cachetools_lib, # hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported "hana": sql_common | { @@ -482,11 +484,11 @@ | classification_lib | {"db-dtypes"} # Pandas extension data types | cachetools_lib, - "s3": {*s3_base, *data_lake_profiling}, + "s3": {*s3_base, *data_lake_profiling, *cachetools_lib}, "gcs": {*s3_base, *data_lake_profiling}, - "abs": {*abs_base, *data_lake_profiling}, + "abs": {*abs_base, *data_lake_profiling, *cachetools_lib}, "sagemaker": aws_common, - "salesforce": {"simple-salesforce"}, + "salesforce": {"simple-salesforce", *cachetools_lib}, "snowflake": snowflake_common | usage_common | sqlglot_lib, "snowflake-summary": snowflake_common | usage_common | sqlglot_lib, "snowflake-queries": snowflake_common | usage_common | sqlglot_lib, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py index 7d82d99412ffe..3ead59eed2d39 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py @@ -2,8 +2,6 @@ from abc import abstractmethod from typing import Any, Dict, Optional -import cachetools -import cachetools.keys import pydantic from pydantic import Field from sqlalchemy.engine import URL @@ -29,7 +27,6 @@ StatefulIngestionConfigBase, ) from datahub.ingestion.source_config.operation_config import is_profiling_enabled -from datahub.utilities.cachetools_keys import self_methodkey logger: logging.Logger = logging.getLogger(__name__) @@ -118,13 +115,6 @@ class SQLCommonConfig( # Custom Stateful Ingestion settings stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None - # TRICKY: The operation_config is time-dependent. Because we don't want to change - # whether or not we're running profiling mid-ingestion, we cache the result of this method. - # TODO: This decorator should be moved to the is_profiling_enabled(operation_config) method. - @cachetools.cached( - cache=cachetools.LRUCache(maxsize=1), - key=self_methodkey, - ) def is_profiling_enabled(self) -> bool: return self.profiling.enabled and is_profiling_enabled( self.profiling.operation_config diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/operation_config.py b/metadata-ingestion/src/datahub/ingestion/source_config/operation_config.py index a670173aa4751..1846dcb4fdd3d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/operation_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/operation_config.py @@ -2,10 +2,12 @@ import logging from typing import Any, Dict, Optional +import cachetools import pydantic from pydantic.fields import Field from datahub.configuration.common import ConfigModel +from datahub.utilities.cachetools_keys import self_methodkey logger = logging.getLogger(__name__) @@ -62,6 +64,13 @@ def validate_profile_date_of_month(cls, v: Optional[int]) -> Optional[int]: return profile_date_of_month +# TRICKY: The operation_config is time-dependent. Because we don't want to change +# whether or not we're running profiling mid-ingestion, we cache the result of this method. +# An additional benefit is that we only print the log lines on the first call. +@cachetools.cached( + cache=cachetools.LRUCache(maxsize=1), + key=self_methodkey, +) def is_profiling_enabled(operation_config: OperationConfig) -> bool: if operation_config.lower_freq_profile_enabled is False: return True