Skip to content

Commit

Permalink
Improve Test Coverage for Kubernetes Executor (apache#15617)
Browse files Browse the repository at this point in the history
According to the current stats of codecov.io assessment of the Airflow code base, the test coverage for
the kubernetes_executor.py module is about 63%. This metric definitely
needs to be improved upon.

This PR addresses the unit test coverage for the delete_pod method in the AirflowKubernetesScheduler class in the kubernetes_executor.py module. And when merged will help to improve the test coverage metric.

fixes part of apache#15523
  • Loading branch information
drdenzy authored May 4, 2021
1 parent cf583b9 commit dd56875
Showing 1 changed file with 66 additions and 3 deletions.
69 changes: 66 additions & 3 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@

import pytest
from kubernetes.client import models as k8s
from kubernetes.client.rest import ApiException
from urllib3 import HTTPResponse

from airflow.utils import timezone
from tests.test_utils.config import conf_vars

try:
from kubernetes.client.rest import ApiException

from airflow.executors.kubernetes_executor import (
AirflowKubernetesScheduler,
KubernetesExecutor,
Expand Down Expand Up @@ -120,6 +119,71 @@ def test_execution_date_serialize_deserialize(self):

assert datetime_obj == new_datetime_obj

@unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.client')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
def test_delete_pod_successfully(
self, mock_watcher, mock_client, mock_kube_client
): # pylint: disable=unused-argument
pod_id = "my-pod-1"
namespace = "my-namespace-1"

mock_delete_namespace = mock.MagicMock()
mock_kube_client.return_value.delete_namespaced_pod = mock_delete_namespace

kube_executor = KubernetesExecutor()
kube_executor.job_id = "test-job-id"
kube_executor.start()
kube_executor.kube_scheduler.delete_pod(pod_id, namespace)

mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())

@unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.client')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
def test_delete_pod_raises_404(
self, mock_watcher, mock_client, mock_kube_client
): # pylint: disable=unused-argument
pod_id = "my-pod-1"
namespace = "my-namespace-2"

mock_delete_namespace = mock.MagicMock()
mock_kube_client.return_value.delete_namespaced_pod = mock_delete_namespace

# ApiException is raised because status is not 404
mock_kube_client.return_value.delete_namespaced_pod.side_effect = ApiException(status=400)
kube_executor = KubernetesExecutor()
kube_executor.job_id = "test-job-id"
kube_executor.start()

with pytest.raises(ApiException):
kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())

@unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.client')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
def test_delete_pod_404_not_raised(
self, mock_watcher, mock_client, mock_kube_client
): # pylint: disable=unused-argument
pod_id = "my-pod-1"
namespace = "my-namespace-3"

mock_delete_namespace = mock.MagicMock()
mock_kube_client.return_value.delete_namespaced_pod = mock_delete_namespace

# ApiException not raised because the status is 404
mock_kube_client.return_value.delete_namespaced_pod.side_effect = ApiException(status=404)
kube_executor = KubernetesExecutor()
kube_executor.job_id = "test-job-id"
kube_executor.start()

kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())


class TestKubernetesExecutor(unittest.TestCase):
"""
Expand Down Expand Up @@ -160,7 +224,6 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
('kubernetes', 'pod_template_file'): path,
}
with conf_vars(config):

kubernetes_executor = self.kubernetes_executor
kubernetes_executor.start()
# Execute a task while the Api Throws errors
Expand Down

0 comments on commit dd56875

Please sign in to comment.