Skip to content

Commit

Permalink
Merge pull request #203 from dmgav/cli-support
Browse files Browse the repository at this point in the history
CLI support for new API
  • Loading branch information
dmgav authored Dec 29, 2021
2 parents 957fcbf + 7b3ba24 commit 8dafdac
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 31 deletions.
21 changes: 11 additions & 10 deletions bluesky_queueserver/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2229,16 +2229,17 @@ async def _function_execute_handler(self, request):

return {"success": success, "msg": msg, "item": item, "task_uid": task_uid}

async def _task_load_result_handler(self, request):
async def _task_result_get_handler(self, request):
"""
Load result of a task executed by the worker process. The request must contain valid ``task_uid``.
Task UIDs are returned by the API used to start tasks. Returned parameters: ``success`` and
``msg`` indicate success of the API call and error message in case of API call failure;
``status`` is the status of the task (``running``, ``completed``, ``not_found``), ``result``
is a dictionary with information about the task. The information is be different for
the completed and running tasks. If ``status=='not_found'``, then is ``result`` is ``{}``.
Returns the information of a task executed by the worker process. The request must contain
valid ``task_uid``, returned by one of APIs that starts tasks. Returned
parameters: ``success`` and ``msg`` indicate success of the API call and error message in
case of API call failure; ``status`` is the status of the task (``running``, ``completed``,
``not_found``), ``result`` is a dictionary with information about the task. The information
is be different for the completed and running tasks. If ``status=='not_found'``, then is
``result`` is ``{}``.
"""
logger.debug("Load result of the task executed by RE worker ...")
logger.debug("Request for the result of the task executed by RE worker ...")

task_uid = None

Expand Down Expand Up @@ -2519,7 +2520,7 @@ async def _zmq_execute(self, msg):
"environment_destroy": "_environment_destroy_handler",
"script_upload": "_script_upload_handler",
"function_execute": "_function_execute_handler",
"task_load_result": "_task_load_result_handler",
"task_result_get": "_task_result_get_handler",
"queue_mode_set": "_queue_mode_set_handler",
"queue_item_add": "_queue_item_add_handler",
"queue_item_add_batch": "_queue_item_add_batch_handler",
Expand Down Expand Up @@ -2699,7 +2700,7 @@ def gen_log_msg(msg_out):
if dict_name in log_msg_out:
d = log_msg_out[dict_name]
for k in d.keys():
d[k] = "..."
d[k] = "{...}"
return log_msg_out

# Send reply back to client
Expand Down
178 changes: 176 additions & 2 deletions bluesky_queueserver/manager/qserver_cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ast
import copy
import time as ttime
from datetime import datetime
import pprint
Expand Down Expand Up @@ -45,6 +46,8 @@ class QServerExitCodes(enum.Enum):
qserver environment close # Close RE environment
qserver environment destroy # Destroy RE environment (kill RE worker process)
qserver existing plans # Request the list of existing plans
qserver existing devices # Request the list of existing devices
qserver allowed plans # Request the list of allowed plans
qserver allowed devices # Request the list of allowed devices
qserver permissions reload # Reload user permissions and generate lists of allowed plans and devices.
Expand Down Expand Up @@ -125,6 +128,18 @@ class QServerExitCodes(enum.Enum):
qserver history get # Request plan history
qserver history clear # Clear plan history
qserver function execute <function-params> # Start execution of a function
qserver function execute <function-params> background # ... in the background thread
Example of JSON specification of a function ("args" and "kwargs" are optional):
'{"name": "function_sleep", "args": [20], "kwargs": {}}'
qserver script upload <path-to-file> # Upload a script to RE Worker environment
qserver script upload <path-to-file> background # ... in the background
qserver script upload <path-to-file> update-re # ... allow 'RE' and 'db' to be updated
qserver task result get <task-uid> # Load status or result of a task with the given UID
qserver manager stop # Safely exit RE Manager application
qserver manager stop safe on # Safely exit RE Manager application
qserver manager stop safe off # Force RE Manager application to stop
Expand Down Expand Up @@ -416,6 +431,76 @@ def msg_queue_add_update(params, *, cmd_opt):
return method, prms


def msg_function_execute(params, *, cmd_opt):
"""
Generate outgoing message for `function_execute` command. See ``cli_examples`` for supported formats
for the command.
Parameters
----------
params : list
List of parameters of the command. The first elements of the list is expected to be
``["function", "execute"]``.
cmd_opt : str
Command option, must match ``param[0]``.
Returns
-------
str
Name of the method from RE Manager API
dict
Dictionary of the method parameters
Raises
------
CommandParameterError
"""
# Check if the function was called for the appropriate command
command = "function"
expected_p0 = cmd_opt
if params[0] != expected_p0:
raise ValueError(f"Incorrect parameter value '{params[0]}'. Expected value: '{expected_p0}'")

call_params = {}

try:
if params[0] == "execute":
if len(params) < 2:
raise CommandParameterError(f"Incorrect number of parameters: '{command} {params[0]}'")

item_str = params[1]
try:
# Convert quoted string to dictionary.
item = ast.literal_eval(item_str)
except Exception:
raise CommandParameterError(f"Error occurred while parsing the plan '{item_str}'")

item["item_type"] = "function"

run_in_background = False
allowed_params = ("background",)
for p in params[2:]:
if p not in allowed_params:
raise CommandParameterError(f"Unsupported combination of parameters: '{command} {params}'")
if p == "background":
run_in_background = True

call_params["item"] = item
call_params["run_in_background"] = run_in_background
else:
raise CommandParameterError(f"Option '{params[0]}' is not supported: '{command} {params[0]}'")

except IndexError:
raise CommandParameterError(f"The command '{params}' contains insufficient number of parameters")

method = f"{command}_{params[0]}"
prms = call_params
prms["user"] = default_user
prms["user_group"] = default_user_group

return method, prms


def msg_queue_item(params):
"""
Generate outgoing message for `queue item` command. The supported options are ``get``,
Expand Down Expand Up @@ -713,6 +798,16 @@ def create_msg(params):
else:
raise CommandParameterError(f"Request '{command} {params[0]}' is not supported")

# ----------- existing ------------
elif command == "existing":
if len(params) != 1:
raise CommandParameterError(f"Request '{command}' must include only one parameter")
supported_params = ("plans", "devices")
if params[0] in supported_params:
method = f"{params[0]}_{command}"
else:
raise CommandParameterError(f"Request '{command} {params[0]}' is not supported")

# ----------- permissions ------------
elif command == "permissions":
if len(params) not in (1, 2):
Expand All @@ -727,6 +822,61 @@ def create_msg(params):
else:
raise CommandParameterError(f"Request '{command} {params[0]}' is not supported")

# ----------- script ------------
elif command == "script":
if len(params) < 2:
raise CommandParameterError(f"Request '{command}' must include at least two parameters")
if params[0] == "upload":
# Parameter 1 (required) - file name, parameter 2 (optional) - 'background'
file_name = params[1]
try:
with open(file_name, "r") as f:
script = f.read()
except Exception as ex:
raise CommandParameterError(
f"Request '{command}': failed to read the script from file '{file_name}': {ex}"
)

run_in_background = False
update_re = False
allowed_values = ("background", "update-re")

for p in params[2:]:
if p not in allowed_values:
raise CommandParameterError(
f"Request '{command}': parameter combination {params} is not supported"
)
if p == "background":
run_in_background = True
elif p == "update-re":
update_re = True

method = f"{command}_{params[0]}"
prms = {"script": script, "run_in_background": run_in_background, "update_re": update_re}

else:
raise CommandParameterError(f"Request '{command} {params[0]}' is not supported")

# ----------- task ------------
elif command == "task":
if len(params) != 3:
raise CommandParameterError(f"Request '{command}' must include at 3 parameters")
if (params[0] == "result") and (params[1] == "get"):
task_uid = str(params[2])
method = f"{command}_{params[0]}_{params[1]}"
prms = {"task_uid": task_uid}
else:
raise CommandParameterError(f"Request '{command} {params[0]} {params[1]}' is not supported")

# ----------- function ------------
elif command == "function":
if len(params) < 1:
raise CommandParameterError(f"Request '{command}' must include at least one parameter")
if params[0] == "execute":
method, prms = msg_function_execute(params, cmd_opt=params[0])
else:
raise CommandParameterError(f"Request '{command} {params[0]}' is not supported")

# ----------- queue ------------
elif command == "queue":
if len(params) < 1:
Expand Down Expand Up @@ -815,6 +965,31 @@ def create_msg(params):
return method, prms, monitoring_mode


def prepare_qserver_output(msg):
"""
Prepare received message for printing by 'qserver' CLI tool.
Parameters
----------
msg: dict
Full message received from RE Manager.
Returns
-------
str
Formatted message, which is ready for printing.
"""
msg = copy.deepcopy(msg)
# Do not print large dicts in the log: replace values with "..."
large_dicts = ("plans_allowed", "plans_existing", "devices_allowed", "devices_existing")
for dict_name in large_dicts:
if dict_name in msg:
d = msg[dict_name]
for k in d.keys():
d[k] = "{...}"
return pprint.pformat(msg)


def qserver():

logging.basicConfig(level=logging.WARNING)
Expand Down Expand Up @@ -870,7 +1045,6 @@ def formatter(prog):
raise CommandParameterError(f"ZMQ public key is improperly formatted: {ex}")

method, params, monitoring_mode = create_msg(args.command)

if monitoring_mode:
print("Running QServer monitor. Press Ctrl-C to exit ...")

Expand All @@ -883,7 +1057,7 @@ def formatter(prog):
current_time = now.strftime("%H:%M:%S")

if not msg_err:
print(f"{current_time} - MESSAGE: {pprint.pformat(msg)}")
print(f"{current_time} - MESSAGE: \n{prepare_qserver_output(msg)}")
if isinstance(msg, dict) and ("success" in msg) and (msg["success"] is False):
exit_code = QServerExitCodes.REQUEST_FAILED
else:
Expand Down
2 changes: 1 addition & 1 deletion bluesky_queueserver/manager/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def wait_for_task_result(time, task_uid):
while ttime.time() < time_stop:
ttime.sleep(dt / 2)
try:
resp, _ = zmq_secure_request("task_load_result", params={"task_uid": task_uid})
resp, _ = zmq_secure_request("task_result_get", params={"task_uid": task_uid})

assert resp["success"] is True, f"Request for task result failed: {resp['msg']}"
assert resp["task_uid"] == task_uid
Expand Down
Loading

0 comments on commit 8dafdac

Please sign in to comment.