From 6ae4965b881b492d2371b29b3e26fcfeac69db5a Mon Sep 17 00:00:00 2001 From: Michael Fu <42988210+michaelfu1029@users.noreply.github.com> Date: Thu, 14 Nov 2024 15:27:01 -0800 Subject: [PATCH] Update pool size if environment was created during problematic timeframe (#168) *Issue #, if available:* *Description of changes:* 2.9.2 environments created after July 8(launch) and before Sept 6(fix deployed) have default_pool size 128 so we update this to 10000 (the value used for previous versions) By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --------- Co-authored-by: Michael Fu --- .../airflow/2.9.2/docker-compose-hybrid.yaml | 1 + images/airflow/2.9.2/docker-compose.yaml | 1 + .../airflow/2.9.2/python/mwaa/entrypoint.py | 43 +++++++++++++++++-- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/images/airflow/2.9.2/docker-compose-hybrid.yaml b/images/airflow/2.9.2/docker-compose-hybrid.yaml index c4c10f8..3da7396 100644 --- a/images/airflow/2.9.2/docker-compose-hybrid.yaml +++ b/images/airflow/2.9.2/docker-compose-hybrid.yaml @@ -18,6 +18,7 @@ x-airflow-common: &airflow-common MWAA__CORE__REQUIREMENTS_PATH: ${MWAA__CORE__REQUIREMENTS_PATH} MWAA__CORE__AUTH_TYPE: "testing" # Additional Airflow configuration can be passed here in JSON form. + MWAA__CORE__CREATED_AT: "Tue Sep 18 23:05:58 UTC 2024" MWAA__CORE__CUSTOM_AIRFLOW_CONFIGS: "{}" MWAA__CORE__FERNET_KEY: '{"FernetKey": "fake-key-nNge+lks3RBeGVrnZ1Dq5GjKerbZKmb7dXNnsNsGy3E="}' MWAA__WEBSERVER__SECRET: '{"secret_key": "fake-key-aYDdF6d+Fjznai5yBW63CUAi0IipJqDHlNSWIun6y8o="}' diff --git a/images/airflow/2.9.2/docker-compose.yaml b/images/airflow/2.9.2/docker-compose.yaml index 075ea20..7196ee4 100644 --- a/images/airflow/2.9.2/docker-compose.yaml +++ b/images/airflow/2.9.2/docker-compose.yaml @@ -18,6 +18,7 @@ x-airflow-common: &airflow-common MWAA__CORE__REQUIREMENTS_PATH: ${MWAA__CORE__REQUIREMENTS_PATH} MWAA__CORE__AUTH_TYPE: "testing" # Additional Airflow configuration can be passed here in JSON form. + MWAA__CORE__CREATED_AT: "Tue Sep 18 23:05:58 UTC 2024" MWAA__CORE__CUSTOM_AIRFLOW_CONFIGS: "{}" MWAA__CORE__FERNET_KEY: '{"FernetKey": "fake-key-nNge+lks3RBeGVrnZ1Dq5GjKerbZKmb7dXNnsNsGy3E="}' MWAA__WEBSERVER__SECRET: '{"secret_key": "fake-key-aYDdF6d+Fjznai5yBW63CUAi0IipJqDHlNSWIun6y8o="}' diff --git a/images/airflow/2.9.2/python/mwaa/entrypoint.py b/images/airflow/2.9.2/python/mwaa/entrypoint.py index aa91d49..f3a09c9 100644 --- a/images/airflow/2.9.2/python/mwaa/entrypoint.py +++ b/images/airflow/2.9.2/python/mwaa/entrypoint.py @@ -25,7 +25,7 @@ # fmt: on # Python imports -from datetime import timedelta +from datetime import datetime, timedelta from functools import cache from typing import Callable, Dict, List, Optional import asyncio @@ -66,7 +66,7 @@ from mwaa.subprocess.subprocess import Subprocess, run_subprocesses from mwaa.utils.cmd import run_command from mwaa.utils.dblock import with_db_lock - +from mwaa.utils.statsd import get_statsd # Usually, we pass the `__name__` variable instead as that defaults to the # module path, i.e. `mwaa.entrypoint` in this case. However, since this is @@ -102,7 +102,6 @@ # failures due to scheduler failure. HYBRID_WORKER_SIGTERM_PATIENCE_INTERVAL_DEFAULT = timedelta(seconds=130) - async def airflow_db_init(environ: dict[str, str]): """ Initialize Airflow database. @@ -116,6 +115,43 @@ async def airflow_db_init(environ: dict[str, str]): await run_command("python3 -m mwaa.database.migrate", env=environ) +async def increase_pool_size_if_default_size(environ: dict[str, str]): + """ + Update the default pool size + + Fix a regression where some 2.9.2 environments were created with the default 128 default + pool size. This function checks if the environment was created during the problematic + timeframe and update the size if it has not been updated by the customer. + + :param environ: A dictionary containing the environment variables. + """ + created_at = os.environ.get("MWAA__CORE__CREATED_AT") + problematic_pool_size = 128 + + if created_at: + try: + date_format = "%a %b %d %H:%M:%S %Z %Y" + created_date = datetime.strptime(created_at, date_format) + # Has a little buffer from when 2.9.2 was released and when the fix was fully deployed + issue_beginning = datetime(2024, 7, 8) + issue_resolution = datetime(2024, 9, 6) + + if created_date > issue_beginning and created_date < issue_resolution: + command_output = [] + + # Get the current default_pool size + await run_command("airflow pools get default_pool | grep default_pool | awk '{print $3}'", + env=environ, stdout_logging_method=lambda output : command_output.append(output)) + + # Increasing the pool size if it is the default size + if len(command_output) == 1 and int(command_output[0]) == problematic_pool_size: + logger.info("Setting default_pool size to 10000.") + await run_command("airflow pools set default_pool 10000 default", env=environ) + stats = get_statsd() + stats.incr("mwaa.pool.increased_default_pool_size", 1) + except Exception as error: + logger.error("Error checking if pool issue is present: " + error) + @with_db_lock(4321) async def airflow_db_reset(environ: dict[str, str]): """ @@ -736,6 +772,7 @@ async def main() -> None: await install_user_requirements(command, environ) await airflow_db_init(environ) + await increase_pool_size_if_default_size(environ) if os.environ.get("MWAA__CORE__AUTH_TYPE", "").lower() == "testing": # In "simple" auth mode, we create an admin user "airflow" with password # "airflow". We use this to make the Docker Compose setup easy to use without