From 6235cf897598b03ace2e52bb677b478a5612ccad Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Thu, 12 Sep 2024 19:03:24 +0530 Subject: [PATCH] Revert "Handle Example dags case when checking for missing files (#41856)" (#42193) This reverts commit 435e9687b0c56499bc29c21d3cada8ae9e0a8c53. --- airflow/dag_processing/manager.py | 11 +-- tests/dag_processing/test_job_runner.py | 89 +++++++++++++------------ 2 files changed, 48 insertions(+), 52 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 7e404307dccd8..fee515dc07164 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -41,7 +41,6 @@ from tabulate import tabulate import airflow.models -from airflow import example_dags from airflow.api_internal.internal_api_call import internal_api_call from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest from airflow.configuration import conf @@ -70,8 +69,6 @@ from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import prohibit_commit, with_row_locks -example_dag_folder = next(iter(example_dags.__path__)) - if TYPE_CHECKING: from multiprocessing.connection import Connection as MultiprocessingConnection @@ -530,11 +527,9 @@ def deactivate_stale_dags( for dag in dags_parsed: # When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder, - # DAGs from the previous DAGs folder will be marked as stale. We also need to handle example dags - # differently. Note that this change has no impact on standalone DAG processors. - dag_not_in_current_dag_folder = ( - not os.path.commonpath([dag.fileloc, example_dag_folder]) == example_dag_folder - ) and (os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory) + # DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact + # on standalone DAG processors. + dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory # The largest valid difference between a DagFileStat's last_finished_time and a DAG's # last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is # no longer present in the file. We have a stale_dag_threshold configured to prevent a diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 0e15a2d1f6690..9b8437d77d50a 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -773,57 +773,58 @@ def test_scan_stale_dags_when_dag_folder_change(self): def get_dag_string(filename) -> str: return open(TEST_DAG_FOLDER / filename).read() - def add_dag_to_db(file_path, dag_id, processor_subdir): - dagbag = DagBag(file_path, read_dags_from_db=False) - dag = dagbag.get_dag(dag_id) - dag.fileloc = file_path - dag.last_parsed_time = timezone.utcnow() - dag.sync_to_db(processor_subdir=processor_subdir) + with tempfile.TemporaryDirectory() as tmpdir: + old_dag_home = tempfile.mkdtemp(dir=tmpdir) + old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py") + old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode()) + old_dag_file.flush() + new_dag_home = tempfile.mkdtemp(dir=tmpdir) + new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py") + new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode()) + new_dag_file.flush() + + manager = DagProcessorJobRunner( + job=Job(), + processor=DagFileProcessorManager( + dag_directory=new_dag_home, + max_runs=1, + processor_timeout=timedelta(minutes=10), + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ), + ) - def create_dag_folder(dag_id): - dag_home = tempfile.mkdtemp(dir=tmpdir) - dag_file = tempfile.NamedTemporaryFile(dir=dag_home, suffix=".py") - dag_file.write(get_dag_string(dag_id).encode()) - dag_file.flush() - return dag_home, dag_file + dagbag = DagBag(old_dag_file.name, read_dags_from_db=False) + other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False) - with tempfile.TemporaryDirectory() as tmpdir: - old_dag_home, old_dag_file = create_dag_folder("test_example_bash_operator.py") - new_dag_home, new_dag_file = create_dag_folder("test_scheduler_dags.py") - example_dag_home, example_dag_file = create_dag_folder("test_dag_warnings.py") - - with mock.patch("airflow.dag_processing.manager.example_dag_folder", example_dag_home): - manager = DagProcessorJobRunner( - job=Job(), - processor=DagFileProcessorManager( - dag_directory=new_dag_home, - max_runs=1, - processor_timeout=timedelta(minutes=10), - signal_conn=MagicMock(), - dag_ids=[], - pickle_dags=False, - async_mode=True, - ), - ) + with create_session() as session: + # Add DAG from old dah home to the DB + dag = dagbag.get_dag("test_example_bash_operator") + dag.fileloc = old_dag_file.name + dag.last_parsed_time = timezone.utcnow() + dag.sync_to_db(processor_subdir=old_dag_home) - with create_session() as session: - add_dag_to_db(old_dag_file.name, "test_example_bash_operator", old_dag_home) - add_dag_to_db(new_dag_file.name, "test_start_date_scheduling", new_dag_home) - add_dag_to_db(example_dag_file.name, "test_dag_warnings", example_dag_home) + # Add DAG from new DAG home to the DB + other_dag = other_dagbag.get_dag("test_start_date_scheduling") + other_dag.fileloc = new_dag_file.name + other_dag.last_parsed_time = timezone.utcnow() + other_dag.sync_to_db(processor_subdir=new_dag_home) - manager.processor._file_paths = [new_dag_file, example_dag_file] + manager.processor._file_paths = [new_dag_file] - active_dag_count = ( - session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() - ) - assert active_dag_count == 3 + active_dag_count = ( + session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() + ) + assert active_dag_count == 2 - manager.processor._scan_stale_dags() + manager.processor._scan_stale_dags() - active_dag_count = ( - session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() - ) - assert active_dag_count == 2 + active_dag_count = ( + session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() + ) + assert active_dag_count == 1 @mock.patch( "airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock