Skip to content

Commit

Permalink
Update pool size if environment was created during problematic timefr…
Browse files Browse the repository at this point in the history
…ame (#168)

*Issue #, if available:*

*Description of changes:*
2.9.2 environments created after July 8(launch) and before Sept 6(fix
deployed) have default_pool size 128 so we update this to 10000 (the
value used for previous versions)

By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: Michael Fu <[email protected]>
  • Loading branch information
michaelfu1029 and Michael Fu authored Nov 14, 2024
1 parent 69f8795 commit 6ae4965
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
1 change: 1 addition & 0 deletions images/airflow/2.9.2/docker-compose-hybrid.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ x-airflow-common: &airflow-common
MWAA__CORE__REQUIREMENTS_PATH: ${MWAA__CORE__REQUIREMENTS_PATH}
MWAA__CORE__AUTH_TYPE: "testing"
# Additional Airflow configuration can be passed here in JSON form.
MWAA__CORE__CREATED_AT: "Tue Sep 18 23:05:58 UTC 2024"
MWAA__CORE__CUSTOM_AIRFLOW_CONFIGS: "{}"
MWAA__CORE__FERNET_KEY: '{"FernetKey": "fake-key-nNge+lks3RBeGVrnZ1Dq5GjKerbZKmb7dXNnsNsGy3E="}'
MWAA__WEBSERVER__SECRET: '{"secret_key": "fake-key-aYDdF6d+Fjznai5yBW63CUAi0IipJqDHlNSWIun6y8o="}'
Expand Down
1 change: 1 addition & 0 deletions images/airflow/2.9.2/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ x-airflow-common: &airflow-common
MWAA__CORE__REQUIREMENTS_PATH: ${MWAA__CORE__REQUIREMENTS_PATH}
MWAA__CORE__AUTH_TYPE: "testing"
# Additional Airflow configuration can be passed here in JSON form.
MWAA__CORE__CREATED_AT: "Tue Sep 18 23:05:58 UTC 2024"
MWAA__CORE__CUSTOM_AIRFLOW_CONFIGS: "{}"
MWAA__CORE__FERNET_KEY: '{"FernetKey": "fake-key-nNge+lks3RBeGVrnZ1Dq5GjKerbZKmb7dXNnsNsGy3E="}'
MWAA__WEBSERVER__SECRET: '{"secret_key": "fake-key-aYDdF6d+Fjznai5yBW63CUAi0IipJqDHlNSWIun6y8o="}'
Expand Down
43 changes: 40 additions & 3 deletions images/airflow/2.9.2/python/mwaa/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# fmt: on

# Python imports
from datetime import timedelta
from datetime import datetime, timedelta
from functools import cache
from typing import Callable, Dict, List, Optional
import asyncio
Expand Down Expand Up @@ -66,7 +66,7 @@
from mwaa.subprocess.subprocess import Subprocess, run_subprocesses
from mwaa.utils.cmd import run_command
from mwaa.utils.dblock import with_db_lock

from mwaa.utils.statsd import get_statsd

# Usually, we pass the `__name__` variable instead as that defaults to the
# module path, i.e. `mwaa.entrypoint` in this case. However, since this is
Expand Down Expand Up @@ -102,7 +102,6 @@
# failures due to scheduler failure.
HYBRID_WORKER_SIGTERM_PATIENCE_INTERVAL_DEFAULT = timedelta(seconds=130)


async def airflow_db_init(environ: dict[str, str]):
"""
Initialize Airflow database.
Expand All @@ -116,6 +115,43 @@ async def airflow_db_init(environ: dict[str, str]):
await run_command("python3 -m mwaa.database.migrate", env=environ)


async def increase_pool_size_if_default_size(environ: dict[str, str]):
"""
Update the default pool size
Fix a regression where some 2.9.2 environments were created with the default 128 default
pool size. This function checks if the environment was created during the problematic
timeframe and update the size if it has not been updated by the customer.
:param environ: A dictionary containing the environment variables.
"""
created_at = os.environ.get("MWAA__CORE__CREATED_AT")
problematic_pool_size = 128

if created_at:
try:
date_format = "%a %b %d %H:%M:%S %Z %Y"
created_date = datetime.strptime(created_at, date_format)
# Has a little buffer from when 2.9.2 was released and when the fix was fully deployed
issue_beginning = datetime(2024, 7, 8)
issue_resolution = datetime(2024, 9, 6)

if created_date > issue_beginning and created_date < issue_resolution:
command_output = []

# Get the current default_pool size
await run_command("airflow pools get default_pool | grep default_pool | awk '{print $3}'",
env=environ, stdout_logging_method=lambda output : command_output.append(output))

# Increasing the pool size if it is the default size
if len(command_output) == 1 and int(command_output[0]) == problematic_pool_size:
logger.info("Setting default_pool size to 10000.")
await run_command("airflow pools set default_pool 10000 default", env=environ)
stats = get_statsd()
stats.incr("mwaa.pool.increased_default_pool_size", 1)
except Exception as error:
logger.error("Error checking if pool issue is present: " + error)

@with_db_lock(4321)
async def airflow_db_reset(environ: dict[str, str]):
"""
Expand Down Expand Up @@ -736,6 +772,7 @@ async def main() -> None:

await install_user_requirements(command, environ)
await airflow_db_init(environ)
await increase_pool_size_if_default_size(environ)
if os.environ.get("MWAA__CORE__AUTH_TYPE", "").lower() == "testing":
# In "simple" auth mode, we create an admin user "airflow" with password
# "airflow". We use this to make the Docker Compose setup easy to use without
Expand Down

0 comments on commit 6ae4965

Please sign in to comment.