Skip to content

Commit

Permalink
Emit error on duplicated DAG ID (apache#15302)
Browse files Browse the repository at this point in the history
This will be shown in logs on initialization, and flashed in UI on later
scheduled refreshes.

closes apache#15248
  • Loading branch information
uranusjr authored May 6, 2021
1 parent faa4a52 commit 0967453
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 60 deletions.
4 changes: 2 additions & 2 deletions airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

# [START instantiate_dag]
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def tutorial_taskflow_api_etl():
def tutorial_taskflow_api_etl_virtualenv():
"""
### TaskFlow API Tutorial Documentation
This is a simple ETL data pipeline example which demonstrates the use of
Expand Down Expand Up @@ -107,7 +107,7 @@ def load(total_order_value: float):


# [START dag_invocation]
tutorial_etl_dag = tutorial_taskflow_api_etl()
tutorial_etl_dag = tutorial_taskflow_api_etl_virtualenv()
# [END dag_invocation]

# [END tutorial]
13 changes: 13 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ class AirflowDagCycleException(AirflowException):
"""Raise when there is a cycle in Dag definition"""


class AirflowDagDuplicatedIdException(AirflowException):
"""Raise when a Dag's ID is already used by another Dag"""

def __init__(self, dag_id: str, incoming: str, existing: str) -> None:
super().__init__(dag_id, incoming, existing)
self.dag_id = dag_id
self.incoming = incoming
self.existing = existing

def __str__(self) -> str:
return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"


class AirflowClusterPolicyViolation(AirflowException):
"""Raise when there is a violation of a Cluster Policy in Dag definition"""

Expand Down
62 changes: 49 additions & 13 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@

from airflow import settings
from airflow.configuration import conf
from airflow.exceptions import AirflowClusterPolicyViolation, AirflowDagCycleException, SerializedDagNotFound
from airflow.exceptions import (
AirflowClusterPolicyViolation,
AirflowDagCycleException,
AirflowDagDuplicatedIdException,
SerializedDagNotFound,
)
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import test_cycle
Expand Down Expand Up @@ -215,8 +220,15 @@ def get_dag(self, dag_id, session: Session = None):
# If the dag corresponding to root_dag_id is absent or expired
is_missing = root_dag_id not in self.dags
is_expired = orm_dag.last_expired and dag and dag.last_loaded < orm_dag.last_expired
if is_expired:
# Remove associated dags so we can re-add them.
self.dags = {
key: dag
for key, dag in self.dags.items()
if root_dag_id != key and not (dag.is_subdag and root_dag_id == dag.parent_dag.dag_id)
}
if is_missing or is_expired:
# Reprocess source file
# Reprocess source file.
found_dags = self.process_file(
filepath=correct_maybe_zipped(orm_dag.fileloc), only_if_updated=False
)
Expand Down Expand Up @@ -381,7 +393,11 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk):
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = f"Invalid Cron expression: {cron_e}"
self.file_last_changed[dag.full_filepath] = file_last_changed_on_disk
except (AirflowDagCycleException, AirflowClusterPolicyViolation) as exception:
except (
AirflowDagCycleException,
AirflowDagDuplicatedIdException,
AirflowClusterPolicyViolation,
) as exception:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = str(exception)
self.file_last_changed[dag.full_filepath] = file_last_changed_on_disk
Expand All @@ -390,7 +406,17 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk):
def bag_dag(self, dag, root_dag):
"""
Adds the DAG into the bag, recurses into sub dags.
Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags
:raises: AirflowDagCycleException if a cycle is detected in this dag or its subdags.
:raises: AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag.
"""
self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)

def _bag_dag(self, *, dag, root_dag, recursive):
"""Actual implementation of bagging a dag.
The only purpose of this is to avoid exposing ``recursive`` in ``bag_dag()``,
intended to only be used by the ``_bag_dag()`` implementation.
"""
test_cycle(dag) # throws if a task cycle is found

Expand All @@ -406,24 +432,34 @@ def bag_dag(self, dag, root_dag):
subdags = dag.subdags

try:
for subdag in subdags:
subdag.full_filepath = dag.full_filepath
subdag.parent_dag = dag
subdag.is_subdag = True
self.bag_dag(dag=subdag, root_dag=root_dag)

# DAG.subdags automatically performs DFS search, so we don't recurse
# into further _bag_dag() calls.
if recursive:
for subdag in subdags:
subdag.full_filepath = dag.full_filepath
subdag.parent_dag = dag
subdag.is_subdag = True
self._bag_dag(dag=subdag, root_dag=root_dag, recursive=False)

prev_dag = self.dags.get(dag.dag_id)
if prev_dag and prev_dag.full_filepath != dag.full_filepath:
raise AirflowDagDuplicatedIdException(
dag_id=dag.dag_id,
incoming=dag.full_filepath,
existing=self.dags[dag.dag_id].full_filepath,
)
self.dags[dag.dag_id] = dag
self.log.debug('Loaded DAG %s', dag)
except AirflowDagCycleException as cycle_exception:
except (AirflowDagCycleException, AirflowDagDuplicatedIdException):
# There was an error in bagging the dag. Remove it from the list of dags
self.log.exception('Exception bagging dag: %s', dag.dag_id)
# Only necessary at the root level since DAG.subdags automatically
# performs DFS to search through all subdags
if dag == root_dag:
if recursive:
for subdag in subdags:
if subdag.dag_id in self.dags:
del self.dags[subdag.dag_id]
raise cycle_exception
raise

def collect_dags(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def check_notebook(inlets, execution_date):


with DAG(
dag_id='example_papermill_operator',
dag_id='example_papermill_operator_2',
default_args=default_args,
schedule_interval='0 0 * * *',
start_date=days_ago(2),
Expand Down
2 changes: 2 additions & 0 deletions tests/api_connexion/endpoints/test_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def _prepare_db(self):
dagbag = self.app.dag_bag # pylint: disable=no-member
dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
dag.sync_to_db()
dagbag.dags.pop(self.DAG_ID, None)
dagbag.bag_dag(dag=dag, root_dag=dag)
with create_session() as session:
self.ti = TaskInstance(
Expand Down Expand Up @@ -174,6 +175,7 @@ def test_get_logs_of_removed_task(self, session):
# Recreate DAG without tasks
dagbag = self.app.dag_bag # pylint: disable=no-member
dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
del dagbag.dags[self.DAG_ID]
dagbag.bag_dag(dag=dag, root_dag=dag)

key = self.app.config["SECRET_KEY"]
Expand Down
14 changes: 8 additions & 6 deletions tests/core/test_impersonation_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,19 @@ def create_user():

@pytest.mark.quarantined
class TestImpersonation(unittest.TestCase):
def setUp(self):
check_original_docker_image()
grant_permissions()
add_default_pool_if_not_exists()
self.dagbag = models.DagBag(
@classmethod
def setUpClass(cls):
cls.dagbag = models.DagBag(
dag_folder=TEST_DAG_FOLDER,
include_examples=False,
)
logger.info('Loaded DAGS:')
logger.info(self.dagbag.dagbag_report())
logger.info(cls.dagbag.dagbag_report())

def setUp(self):
check_original_docker_image()
grant_permissions()
add_default_pool_if_not_exists()
create_user()

def tearDown(self):
Expand Down
37 changes: 0 additions & 37 deletions tests/dags/test_backfill_pooled_tasks.py

This file was deleted.

2 changes: 1 addition & 1 deletion tests/dags/test_dag_with_no_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@
"start_date": DEFAULT_DATE,
}

with DAG(dag_id="test_only_dummy_tasks", default_args=default_args, schedule_interval='@once') as dag:
with DAG(dag_id="test_dag_with_no_tags", default_args=default_args, schedule_interval='@once') as dag:
task_a = DummyOperator(task_id="test_task_a")
32 changes: 32 additions & 0 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,38 @@ def test_process_file_that_contains_multi_bytes_char(self):
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
assert [] == dagbag.process_file(f.name)

def test_process_file_duplicated_dag_id(self):
"""Loading a DAG with ID that already existed in a DAG bag should result in an import error."""
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)

def create_dag():
from airflow.decorators import dag

@dag(default_args={'owner': 'owner1'})
def my_flow():
pass

my_dag = my_flow() # noqa # pylint: disable=unused-variable

source_lines = [line[12:] for line in inspect.getsource(create_dag).splitlines(keepends=True)[1:]]
with NamedTemporaryFile("w+", encoding="utf8") as tf_1, NamedTemporaryFile(
"w+", encoding="utf8"
) as tf_2:
tf_1.writelines(source_lines)
tf_2.writelines(source_lines)
tf_1.flush()
tf_2.flush()

found_1 = dagbag.process_file(tf_1.name)
assert len(found_1) == 1 and found_1[0].dag_id == "my_flow"
assert dagbag.import_errors == {}
dags_in_bag = dagbag.dags

found_2 = dagbag.process_file(tf_2.name)
assert len(found_2) == 0
assert dagbag.import_errors[tf_2.name].startswith("Ignoring DAG")
assert dagbag.dags == dags_in_bag # Should not change.

def test_zip_skip_log(self):
"""
test the loading of a DAG from within a zip file that skips another file because
Expand Down
3 changes: 3 additions & 0 deletions tests/www/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1524,6 +1524,8 @@ def setUp(self):
self.login()

dagbag = self.app.dag_bag
dagbag.dags.pop(self.DAG_ID, None)
dagbag.dags.pop(self.DAG_ID_REMOVED, None)
dag = DAG(self.DAG_ID, start_date=self.DEFAULT_DATE)
dag_removed = DAG(self.DAG_ID_REMOVED, start_date=self.DEFAULT_DATE)
dagbag.bag_dag(dag=dag, root_dag=dag)
Expand Down Expand Up @@ -1798,6 +1800,7 @@ def __init__(self, test, endpoint):

def setup(self):
dagbag = self.test.app.dag_bag
dagbag.dags.pop(self.DAG_ID, None)
dag = DAG(self.DAG_ID, start_date=self.DEFAULT_DATE)
dagbag.bag_dag(dag=dag, root_dag=dag)
for run_data in self.RUNS_DATA:
Expand Down

0 comments on commit 0967453

Please sign in to comment.