Skip to content

Commit

Permalink
feat(cli): add --follow flag to logs command (reanahub#731)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlemesh committed Sep 30, 2024
1 parent 79d0483 commit 90a9109
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 92 deletions.
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The list of contributors in alphabetical order:
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
- [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X)
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
- [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630)
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)
- [Marco Vidal](https://orcid.org/0000-0002-9363-4971)
Expand Down
148 changes: 148 additions & 0 deletions reana_client/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@

import functools
import json
import logging
import os
import shlex
import sys
import time
import traceback
from typing import Callable, NoReturn, Optional, List, Tuple, Union

import click
import tablib

from reana_commons.config import REANA_COMPUTE_BACKENDS
from reana_commons.utils import click_table_printer

from reana_client.config import (
Expand Down Expand Up @@ -409,3 +413,147 @@ def output_user_friendly_logs(workflow_logs, steps):
f"Step {job_name_or_id} emitted no logs.",
msg_type="info",
)


def retrieve_workflow_logs(
workflow,
access_token,
json_format,
filters=None,
page=None,
size=None,
): # noqa: D301
"""Retrieve workflow logs."""
from reana_client.api.client import get_workflow_logs

available_filters = {
"step": "job_name",
"compute_backend": "compute_backend",
"docker_img": "docker_img",
"status": "status",
}
steps = []
chosen_filters = dict()

if filters:
try:
for f in filters:
key, value = f.split("=")
if key not in available_filters:
display_message(
"Filter '{}' is not valid.\n"
"Available filters are '{}'.".format(
key,
"' '".join(sorted(available_filters.keys())),
),
msg_type="error",
)
sys.exit(1)
elif key == "step":
steps.append(value)
else:
# Case insensitive for compute backends
if (
key == "compute_backend"
and value.lower() in REANA_COMPUTE_BACKENDS
):
value = REANA_COMPUTE_BACKENDS[value.lower()]
elif key == "status" and value not in RUN_STATUSES:
display_message(
"Input status value {} is not valid. ".format(value),
msg_type="error",
),
sys.exit(1)
chosen_filters[key] = value
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Please provide complete --filter name=value pairs, "
"for example --filter status=running.\n"
"Available filters are '{}'.".format(
"' '".join(sorted(available_filters.keys()))
),
msg_type="error",
)
sys.exit(1)

response = get_workflow_logs(
workflow,
access_token,
steps=None if not steps else list(set(steps)),
page=page,
size=size,
)
workflow_logs = json.loads(response["logs"])
if filters:
for key, value in chosen_filters.items():
unwanted_steps = [
k
for k, v in workflow_logs["job_logs"].items()
if v[available_filters[key]] != value
]
for job_id in unwanted_steps:
del workflow_logs["job_logs"][job_id]

if json_format:
display_message(json.dumps(workflow_logs, indent=2))
sys.exit(0)
else:
from reana_client.cli.utils import output_user_friendly_logs

output_user_friendly_logs(workflow_logs, None if not steps else list(set(steps)))


def follow_workflow_logs(
workflow,
access_token,
interval,
step=None,
): # noqa: D301
"""Continuously poll for workflow or job logs."""
from reana_client.api.client import get_workflow_logs, get_workflow_status

msg = f"Following logs for workflow: {workflow}"
if step:
msg += f", step: {step}"
display_message(msg, "info")

previous_logs = ""

while True:
response = get_workflow_logs(
workflow,
access_token,
steps=None if not step else [step],
).get("logs")
json_response = json.loads(response)

if step:
jobs = json_response["job_logs"]

if not jobs:
raise Exception(f"Step data not found: {step}")

job = next(
iter(jobs.values())
) # get values of the first job
logs = job["logs"]
status = job["status"]
else:
logs = json_response["workflow_logs"]
status = get_workflow_status(workflow, access_token).get("status")

previous_lines = previous_logs.splitlines()
new_lines = logs.splitlines()

diff = "\n".join([x for x in new_lines if x not in previous_lines])
if diff != "" and diff != "\n":
display_message(diff)

if status in ["finished", "failed", "stopped", "deleted"]:
display_message("")
display_message(f"Finished, status: {status}", "info")
return
previous_logs = logs
time.sleep(interval)
158 changes: 66 additions & 92 deletions reana_client/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
key_value_to_dict,
parse_filter_parameters,
requires_environments,
retrieve_workflow_logs,
follow_workflow_logs,
)
from reana_client.config import ERROR_MESSAGES, RUN_STATUSES, TIMECHECK
from reana_client.printer import display_message
Expand All @@ -47,7 +49,7 @@
validate_input_parameters,
validate_workflow_name_parameter,
)
from reana_commons.config import INTERACTIVE_SESSION_TYPES, REANA_COMPUTE_BACKENDS
from reana_commons.config import INTERACTIVE_SESSION_TYPES
from reana_commons.errors import REANAValidationError
from reana_commons.validation.operational_options import validate_operational_options

Expand Down Expand Up @@ -886,6 +888,30 @@ def add_verbose_data_from_response(response, verbose_headers, headers, data):
multiple=True,
help="Filter job logs to include only those steps that match certain filtering criteria. Use --filter name=value pairs. Available filters are compute_backend, docker_img, status and step.",
)
@click.option(
"--follow",
"follow",
is_flag=True,
default=False,
help="Follow the logs of the of running workflow or job (similar to `tail -f`). "
"If workflow or job finishes running, the command exits.",
)
@click.option(
"-s",
"--step",
"step",
help="Step name to follow logs for. "
"If flag is supplied, command follows a specified job logs. "
"If it is not supplied, command follows workflow logs. "
"If --follow flag is not supplied, this flag is ignored.",
)
@click.option(
"-i",
"--interval",
"interval",
default=10,
help="Sleep time in seconds between log polling if log following is enabled. [default=10]",
)
@add_pagination_options
@check_connection
@click.pass_context
Expand All @@ -894,115 +920,63 @@ def workflow_logs(
workflow,
access_token,
json_format,
steps=None,
follow,
interval,
filters=None,
page=None,
size=None,
step=None,
): # noqa: D301
"""Get workflow logs.
The ``logs`` command allows to retrieve logs of running workflow. Note that
only finished steps of the workflow are returned, the logs of the currently
processed step is not returned until it is finished.
The ``logs`` command allows to retrieve logs of a running workflow.
Either retrive logs and print the result or follow the logs of a running workflow/job.
Examples:\n
\t $ reana-client logs -w myanalysis.42
\t $ reana-client logs -w myanalysis.42 -s 1st_step
\t $ reana-client logs -w myanalysis.42 --json
\t $ reana-client logs -w myanalysis.42 --filter status=running
\t $ reana-client logs -w myanalysis.42 --step 1st_step --follow
"""
from reana_client.api.client import get_workflow_logs

available_filters = {
"step": "job_name",
"compute_backend": "compute_backend",
"docker_img": "docker_img",
"status": "status",
}
steps = []
chosen_filters = dict()

logging.debug("command: {}".format(ctx.command_path.replace(" ", ".")))
for p in ctx.params:
logging.debug("{param}: {value}".format(param=p, value=ctx.params[p]))
if workflow:
if filters:
try:
for f in filters:
key, value = f.split("=")
if key not in available_filters:
display_message(
"Filter '{}' is not valid.\n"
"Available filters are '{}'.".format(
key,
"' '".join(sorted(available_filters.keys())),
),
msg_type="error",
)
sys.exit(1)
elif key == "step":
steps.append(value)
else:
# Case insensitive for compute backends
if (
key == "compute_backend"
and value.lower() in REANA_COMPUTE_BACKENDS
):
value = REANA_COMPUTE_BACKENDS[value.lower()]
elif key == "status" and value not in RUN_STATUSES:
display_message(
"Input status value {} is not valid. ".format(value),
msg_type="error",
),
sys.exit(1)
chosen_filters[key] = value
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Please provide complete --filter name=value pairs, "
"for example --filter status=running.\n"
"Available filters are '{}'.".format(
"' '".join(sorted(available_filters.keys()))
),
msg_type="error",
)
sys.exit(1)
try:
response = get_workflow_logs(

if step and not follow:
display_message(
"Ignoring --step as it can only be used together with --follow.",
msg_type="warning",
)
if filters and follow:
display_message(
"Ignoring --filters as it cannot be used together with --follow.",
msg_type="warning",
)
if json_format and follow:
display_message(
"Ignoring --json as it cannot be used together with --follow.",
msg_type="warning",
)
try:
if follow:
follow_workflow_logs(workflow, access_token, interval, step)
else:
retrieve_workflow_logs(
workflow,
access_token,
steps=None if not steps else list(set(steps)),
json_format,
filters=filters,
page=page,
size=size,
)
workflow_logs = json.loads(response["logs"])
if filters:
for key, value in chosen_filters.items():
unwanted_steps = [
k
for k, v in workflow_logs["job_logs"].items()
if v[available_filters[key]] != value
]
for job_id in unwanted_steps:
del workflow_logs["job_logs"][job_id]

if json_format:
display_message(json.dumps(workflow_logs, indent=2))
sys.exit(0)
else:
from reana_client.cli.utils import output_user_friendly_logs

output_user_friendly_logs(
workflow_logs, None if not steps else list(set(steps))
)
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Cannot retrieve the logs of a workflow {}: \n"
"{}".format(workflow, str(e)),
msg_type="error",
)
sys.exit(1)
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Cannot retrieve logs for workflow {}: \n{}".format(workflow, str(e)),
msg_type="error",
)
sys.exit(1)


@workflow_execution_group.command("validate")
Expand Down
Loading

0 comments on commit 90a9109

Please sign in to comment.