Skip to content

Commit

Permalink
Serve logs with Scheduler when using Local or Sequential Executor (ap…
Browse files Browse the repository at this point in the history
…ache#15557)

Currently, the `serve_logs` endpoint only exists on Celery workers. This
means if someone launches Airflow with the `LocalExecutor` and wants to
grab the logs from the scheduler, there is no way to move that to the
webserver if it is on a different pod/machine.

This commit makes the scheduler automatically serves logs when using
`LocalExecutor` or `SequentialExecutor`. However, it means for
Airflow <= 2.0.2, the Helm Chart won't serve logs.

closes apache#15070
closes apache#13331
closes apache#15071
closes apache#14222
  • Loading branch information
kaxil authored Apr 29, 2021
1 parent 053d903 commit 414bb20
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 23 deletions.
1 change: 1 addition & 0 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,7 @@ class GroupCommand(NamedTuple):
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_SKIP_SERVE_LOGS,
),
epilog=(
'Signals:\n'
Expand Down
22 changes: 22 additions & 0 deletions airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""Scheduler command"""
import signal
from multiprocessing import Process
from typing import Optional

import daemon
from daemon.pidfile import TimeoutPIDLockFile
Expand All @@ -30,6 +32,8 @@
@cli_utils.action_logging
def scheduler(args):
"""Starts Airflow Scheduler"""
skip_serve_logs = args.skip_serve_logs

print(settings.HEADER)
job = SchedulerJob(
subdir=process_subdir(args.subdir),
Expand All @@ -50,9 +54,27 @@ def scheduler(args):
stderr=stderr_handle,
)
with ctx:
sub_proc = _serve_logs(skip_serve_logs)
job.run()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
sub_proc = _serve_logs(skip_serve_logs)
job.run()

if sub_proc:
sub_proc.terminate()


def _serve_logs(skip_serve_logs: bool = False) -> Optional[Process]:
"""Starts serve_logs sub-process"""
from airflow.configuration import conf
from airflow.utils.serve_logs import serve_logs

if conf.get("core", "executor") in ["LocalExecutor", "SequentialExecutor"]:
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
return sub_proc
return None
29 changes: 6 additions & 23 deletions chart/templates/scheduler/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ spec:
SchedulerJob.latest_heartbeat.desc()).limit(1).first()
sys.exit(0 if job.is_alive() else 1)
{{- if and $local (not $elasticsearch) }}
# Serve logs if we're in local mode and we don't have elasticsearch enabled.
ports:
- name: worker-logs
containerPort: {{ .Values.ports.workerLogs }}
{{- end }}
resources:
{{ toYaml .Values.scheduler.resources | indent 12 }}
volumeMounts:
Expand Down Expand Up @@ -178,29 +184,6 @@ spec:
volumeMounts:
- name: logs
mountPath: {{ template "airflow_logs" . }}
{{- if and $local (not $elasticsearch) }}
# Start the sidecar log server if we're in local mode and
# we don't have elasticsearch enabled.
- name: scheduler-logs
image: {{ template "airflow_image" . }}
imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
args: ["serve_logs"]
ports:
- name: worker-logs
containerPort: {{ .Values.ports.workerLogs }}
volumeMounts:
- name: logs
mountPath: {{ template "airflow_logs" . }}
- name: config
mountPath: {{ template "airflow_config_path" . }}
subPath: airflow.cfg
readOnly: true
envFrom:
{{- include "custom_airflow_environment_from" . | default "\n []" | indent 10 }}
env:
{{- include "custom_airflow_environment" . | indent 10 }}
{{- include "standard_airflow_environment" . | indent 10 }}
{{- end }}
{{- if .Values.scheduler.extraContainers }}
{{- toYaml .Values.scheduler.extraContainers | nindent 8 }}
{{- end }}
Expand Down
72 changes: 72 additions & 0 deletions tests/cli/commands/test_scheduler_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from unittest import mock

from parameterized import parameterized

from airflow.cli import cli_parser
from airflow.cli.commands import scheduler_command
from airflow.utils.serve_logs import serve_logs
from tests.test_utils.config import conf_vars


class TestSchedulerCommand(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.parser = cli_parser.get_parser()

@parameterized.expand(
[
("CeleryExecutor", False),
("LocalExecutor", True),
("SequentialExecutor", True),
("KubernetesExecutor", False),
]
)
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
def test_serve_logs_on_scheduler(
self,
executor,
expect_serve_logs,
mock_process,
mock_scheduler_job,
):
args = self.parser.parse_args(['scheduler'])

with conf_vars({("core", "executor"): executor}):
scheduler_command.scheduler(args)
if expect_serve_logs:
mock_process.assert_called_once_with(target=serve_logs)
else:
mock_process.assert_not_called()

@parameterized.expand(
[
("LocalExecutor",),
("SequentialExecutor",),
]
)
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
def test_skip_serve_logs(self, executor, mock_process, mock_scheduler_job):
args = self.parser.parse_args(['scheduler', '--skip-serve-logs'])
with conf_vars({("core", "executor"): executor}):
scheduler_command.scheduler(args)
mock_process.assert_not_called()

0 comments on commit 414bb20

Please sign in to comment.