diff --git a/bluesky_queueserver/manager/comms.py b/bluesky_queueserver/manager/comms.py index 08114647..40d134f1 100644 --- a/bluesky_queueserver/manager/comms.py +++ b/bluesky_queueserver/manager/comms.py @@ -7,9 +7,8 @@ import zmq import zmq.asyncio -from jsonrpc import JSONRPCResponseManager -from jsonrpc.dispatcher import Dispatcher +from .json_rpc import JSONRPCResponseManager from .logging_setup import PPrintForLogging as ppfl logger = logging.getLogger(__name__) @@ -99,6 +98,10 @@ class PipeJsonRpcReceive: ---------- conn: multiprocessing.Connection Reference to bidirectional end of a pipe (multiprocessing.Pipe) + use_json: boolean + If *True*, the messages are expected to be in encoded as JSON. Otherwise the messages + are expected to be binary. The parameter also enables/disables JSON encoding of + response messages. name: str Name of the receiving thread (it is better to assign meaningful unique names to threads. @@ -108,7 +111,7 @@ class PipeJsonRpcReceive: .. code-block:: python conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRPC(conn=conn1, name="RE QServer Receive") + pc = PipeJsonRPC(conn=conn1, use_json=True, name="RE QServer Receive") def func(): print("Testing") @@ -119,9 +122,9 @@ def func(): pc.stop() # Stop before exit to stop the thread. """ - def __init__(self, conn, *, name="RE QServer Comm"): + def __init__(self, conn, *, use_json=True, name="RE QServer Comm"): self._conn = conn - self._dispatcher = Dispatcher() # json-rpc dispatcher + self._response_manager = JSONRPCResponseManager(use_json=use_json) self._thread_running = False # Set True to exit the thread self._thread_name = name @@ -165,7 +168,7 @@ def stop(self): def __del__(self): self.stop() - def add_method(self, handler, name=None): + def add_method(self, handler, name): """ Add method to json-rpc dispatcher. @@ -173,11 +176,11 @@ def add_method(self, handler, name=None): ---------- handler: callable Reference to a handler - name: str, optional - Name to register (default is the handler name) + name: str + Name to register """ # Add method to json-rpc dispatcher - self._dispatcher.add_method(handler, name) + self._response_manager.add_method(handler, name) def _start_conn_thread(self): if not self._thread_running: @@ -243,9 +246,8 @@ def _handle_msg(self, msg): # if not isinstance(msg_json, dict) or (msg_json["method"] != "heartbeat"): # logger.debug("Command received RE Manager->Watchdog: %s", ppfl(msg_json)) - response = JSONRPCResponseManager.handle(msg, self._dispatcher) + response = self._response_manager.handle(msg) if response: - response = response.json self._conn.send(response) @@ -263,6 +265,9 @@ class PipeJsonRpcSendAsync: Reference to bidirectional end of a pipe (multiprocessing.Pipe) timeout: float Default value of timeout: maximum time to wait for response after a message is sent + use_json: boolean + Enables/disables encoding of the outgoing messages as JSON. If *True*, then the response + messages are also expected to be encoded as JSON. Otherwise the messages are binary. name: str Name of the receiving thread (it is better to assign meaningful unique names to threads. @@ -275,7 +280,7 @@ class PipeJsonRpcSendAsync: async def send_messages(): # Must be instantiated and used within the loop - p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client") + p_send = PipeJsonRpcSendAsync(conn=conn1, use_json=True, name="comm-client") p_send.start() method = "method_name" @@ -288,7 +293,7 @@ async def send_messages(): pc.stop() - pc = PipeJsonRpcSendAsync(conn=conn1, name="RE QServer Receive") + pc = PipeJsonRpcSendAsync(conn=conn1, use_json=True, name="RE QServer Receive") def func(): print("Testing") @@ -300,9 +305,10 @@ def func(): """ - def __init__(self, conn, *, timeout=0.5, name="RE QServer Comm"): + def __init__(self, conn, *, timeout=0.5, use_json=True, name="RE QServer Comm"): self._conn = conn self._loop = asyncio.get_running_loop() + self._use_json = use_json self._thread_name = name @@ -496,7 +502,7 @@ def _pipe_receive(self): if self._conn.poll(self._conn_polling_timeout): try: msg_json = self._conn.recv() - msg = json.loads(msg_json) + msg = json.loads(msg_json) if self._use_json else msg_json # logger.debug("Message Watchdog->Manager received: '%s'", ppfl(msg)) # Messages should be handled in the event loop self._loop.call_soon_threadsafe(self._conn_received, msg) @@ -512,7 +518,7 @@ def _pipe_send(self): msg = None try: msg, fut_send = self._msg_send_buffer.get(timeout=self._conn_polling_timeout) - msg_json = json.dumps(msg) + msg_json = json.dumps(msg) if self._use_json else msg self._conn.send(msg_json) self._loop.call_soon_threadsafe(self._conn_sent, msg, fut_send) except queue.Empty: diff --git a/bluesky_queueserver/manager/json_rpc.py b/bluesky_queueserver/manager/json_rpc.py new file mode 100644 index 00000000..2bcba283 --- /dev/null +++ b/bluesky_queueserver/manager/json_rpc.py @@ -0,0 +1,230 @@ +import inspect +import json +import logging + +logger = logging.getLogger(__name__) + + +class JSONRPCResponseManager: + def __init__(self, *, use_json=True): + """ + Simplified implementation of message handler for JSON RPC protocol. + Written as a replacement for ``json-rpc`` package and supports all features + used by Queue Server. This implementation also supports binary messages + (not encoded as JSON), which significatly speeds up interprocess communication. + + Parameters + ---------- + use_json: boolean + If the parameter is *True*, then incoming messages (passed to the ``handle`` + method) are expected to be in JSON format. Otherwise, the messages are expected + to be dictionary or a list of dictionaries and no decoding is applied. + """ + self._methods = {} + self._use_json = use_json + + def add_method(self, handler, method): + """ + Register a handler. + + Parameters + ---------- + handler: Callable + Method handler. + method: str + Method name. JSON messages must call one of the registered methods. + + Returns + ------- + None + """ + self._methods[method] = handler + + def _decode(self, msg): + """ + Decode the incoming message from JSON (if ``use_json`` is *True*) or return + the message unchanged. + + Parameters + ---------- + msg: str, dict or list(dict) + Encoded message in JSON format (*str*) or unencoded message (*dict* or + *list(dict)*). + + Returns + ------- + dict or list(dict) + Decoded message. + """ + if self._use_json: + return json.loads(msg) + else: + return msg + + def _encode(self, msg): + """ + Encode the response message to JSON (if ``use_json`` is *True*) or return + the message unchanged. + + Parameters + ---------- + msg: dict or list(dict) + A single message (*dict*) or a batch of messages (*list(dict)*). + + Returns + ------- + str, dict or list(dict) + Encoded response message in JSON format (*str*) or original representation + (*dict* or *list(dict)*). + """ + if self._use_json: + return json.dumps(msg) + else: + return msg + + def _get_error_msg(self, error_code): + """ + Returns a standard JSON RPC message based on the error code. + + Parameters + ---------- + error_code: int + One of the standard JSON RPC error codes. + + Returns + ------- + str + Standard message based on ``error_code``. + """ + msgs = { + -32700: "Parse error", + -32600: "Invalid request", + -32601: "Method not found", + -32602: "Invalid params", + -32603: "Internal error", + -32000: "Server error", + } + return msgs.get(error_code, "Unknown error") + + def _handle_single_msg(self, msg): + """ + Handle a single JSON RPC message. + + Parameters + ---------- + msg: dict + Decoded JSON RPC message. + + Returns + ------- + response: dict + Response message + """ + error_code, is_notification, msg_id, response = 0, "id" not in msg, None, None + + try: + if not isinstance(msg, dict) or "method" not in msg or "jsonrpc" not in msg or msg["jsonrpc"] != "2.0": + error_code = -32600 + raise TypeError(f"Invalid message format: {msg!r}") + + method = msg["method"] + params = msg.get("params", {}) + msg_id = msg.get("id", None) + + if not isinstance(params, (tuple, list, dict)): + error_code = -32602 + raise TypeError(f"Invalid params in the message {msg!r}") + + handler = self._methods.get(method) + if handler is None: + error_code = -32601 + raise TypeError(f"Unknown method: {method}") + + try: + if isinstance(params, dict): + inspect.getcallargs(handler, **params) + else: + inspect.getcallargs(handler, *params) + except Exception as ex: + error_code = -32602 + raise TypeError(f"Invalid params in the message {msg!r}: {ex}") from ex + + try: + if isinstance(params, dict): + result = handler(**params) + else: # Tuple or list + result = handler(*params) + if not is_notification: + response = {"jsonrpc": "2.0", "id": msg_id, "result": result} + except Exception: + error_code = -32000 + raise + + except Exception as ex: + if not is_notification: + data = {"type": ex.__class__.__name__, "message": str(ex)} + error = {"code": error_code, "message": self._get_error_msg(error_code), "data": data} + response = {"jsonrpc": "2.0", "id": msg_id, "error": error} + + return response + + def handle(self, msg_full): + """ + Handle JSON RPC message. The message can contain a single message (*dict*) or a batch of messages + (*list(dict)*). Messages in the batch are executed one by one. The response is also a single + message if input message is *dict* or a batch of messages if the input message is *list(dict)*. + + If the response value returned by the function is *None*, it should not be sent to client. + It happens when the input message is a notification ('id' is missing) or all messages in + the batch are notifications. Responses to notifications are not included in the batch of + the response messages, so the response batch may contain less messages than the input batch. + + If an input message can not be decoded (invalid JSON), then the response has 'id' set to *None*. + + Parameters + ---------- + msg_full: str, dict or list(dict) + Input message encoded as JSON (*str*) or not encoded (single message represented as *dict* or + a batch of messages represented as *list(dict)*). The constructor parameter ``use_json`` must + be *True* to use JSON encoding and *False* otherwise. + + Response + -------- + str, dict, list(dict) or None + Output message or a batch of messages in the same format as ``msg_full``. The response + is *None* if there is nothing to send back to the client. + """ + response, is_batch = [], False + + try: + try: + msg_full = self._decode(msg_full) + except Exception as ex: + raise TypeError(f"Failed to parse the message '{msg_full}': {ex}") from ex + + is_batch = isinstance(msg_full, list) + if not is_batch: + msg_full = [msg_full] + + for msg in msg_full: + single_response = self._handle_single_msg(msg) + if single_response: + response.append(single_response) + + if not response: + response = None + elif not is_batch: + response = response[0] + + if response: + response = self._encode(response) + except Exception as ex: + data = {"type": ex.__class__.__name__, "message": str(ex)} + response = { + "jsonrpc": "2.0", + "id": None, + "error": {"code": -32700, "message": self._get_error_msg(-32700), "data": data}, + } + response = self._encode(response) + + return response diff --git a/bluesky_queueserver/manager/manager.py b/bluesky_queueserver/manager/manager.py index d7e00278..1a8c5f38 100644 --- a/bluesky_queueserver/manager/manager.py +++ b/bluesky_queueserver/manager/manager.py @@ -3634,10 +3634,12 @@ async def zmq_server_comm(self): self._comm_to_watchdog = PipeJsonRpcSendAsync( conn=self._watchdog_conn, + use_json=False, name="RE Manager-Watchdog Comm", ) self._comm_to_worker = PipeJsonRpcSendAsync( conn=self._worker_conn, + use_json=False, name="RE Manager-Worker Comm", timeout=self._comm_to_worker_timeout, ) @@ -3814,6 +3816,7 @@ async def zmq_server_comm(self): self._heartbeat_generator_task.cancel() self._comm_to_watchdog.stop() self._comm_to_worker.stop() + await self._plan_queue.stop() self._zmq_socket.close() logger.info("RE Manager was stopped by ZMQ command.") break diff --git a/bluesky_queueserver/manager/plan_queue_ops.py b/bluesky_queueserver/manager/plan_queue_ops.py index 7103faa2..fd77d325 100644 --- a/bluesky_queueserver/manager/plan_queue_ops.py +++ b/bluesky_queueserver/manager/plan_queue_ops.py @@ -65,6 +65,9 @@ class PlanQueueOperations: # Assume that we paused and then stopped the plan. Clear the running plan and # push it back to the queue. Also create the respective history entry. plan = await pq.set_processed_item_as_stopped(exit_status="stopped") + + # 'stopping' disconnects all connections. This step is not required in normal use. + await pq.stop() """ def __init__(self, redis_host="localhost", name_prefix="qs_default"): @@ -273,6 +276,14 @@ async def start(self): self._plan_queue_uid = self.new_item_uid() self._plan_history_uid = self.new_item_uid() + async def stop(self): + """ + Close all connections in the pool. + """ + if self._r_pool: + await self._r_pool.aclose() + self._r_pool = None + async def _queue_clean(self): """ Delete all the invalid queue entries (there could be some entries from failed unit tests). diff --git a/bluesky_queueserver/manager/start_manager.py b/bluesky_queueserver/manager/start_manager.py index 714c615a..7517faa4 100644 --- a/bluesky_queueserver/manager/start_manager.py +++ b/bluesky_queueserver/manager/start_manager.py @@ -3,6 +3,8 @@ import logging import os import re +import signal +import sys import threading import time as ttime from multiprocessing import Pipe, Queue @@ -56,7 +58,7 @@ def __init__( # Class that supports communication over the pipe self._comm_to_manager = PipeJsonRpcReceive( - conn=self._watchdog_to_manager_conn, name="RE Watchdog-Manager Comm" + conn=self._watchdog_to_manager_conn, use_json=False, name="RE Watchdog-Manager Comm" ) self._watchdog_state = 0 # State is currently just time since last notification @@ -222,7 +224,58 @@ def run(self): logger.info("RE Watchdog is stopped") +class AtTerm: + def __init__(self): + """ + Replaces the standard handler for SIGTERM (not SIGKILL). Executes the list of + registered functions before calling the standard handler. + + Examples + -------- + + .. code-block:: + + # Configuring the AtTerm object + atterm = AtTerm() + + # Somewhere at the beginning of the program + atterm.replace_sigterm_handler() + + # Register functions to be executed at SIGTERM + def cleanup(): + # + + atterm.register(cleanup) + """ + self._sigterm_standard_handler = None + self._registered_funcs = [] + + def _sigterm_custom_handler(self, signum, frame): + logger.info("Terminating the process ...") + for func in reversed(self._registered_funcs): + func() + signal.signal(signal.SIGTERM, self._sigterm_standard_handler) + sys.exit(1) + + def replace_sigterm_handler(self): + """ + Replaces standard handler for SIGTERM with the custom handler. The custom + handler calls the standard handler after calling all registered functions + in the reverse order. + """ + self._sigterm_standard_handler = signal.getsignal(signal.SIGTERM) + signal.signal(signal.SIGTERM, self._sigterm_custom_handler) + + def register(self, func): + """ + Register a function (signature has no parameters and returns no values). + Registered functions are executed in reverse order when processing 'SIGTERM'. + """ + self._registered_funcs.append(func) + + def start_manager(): + s_enc = ( "Encryption for ZeroMQ communication server may be enabled by setting the value of\n" "'QSERVER_ZMQ_PRIVATE_KEY_FOR_SERVER' environment variable to a valid private key\n" @@ -237,6 +290,9 @@ def formatter(prog): # Set maximum width such that printed help mostly fits in the RTD theme code block (documentation). return argparse.RawDescriptionHelpFormatter(prog, max_help_position=20, width=90) + atterm = AtTerm() + atterm.replace_sigterm_handler() + parser = argparse.ArgumentParser( description=f"Start Run Engine (RE) Manager\nbluesky-queueserver version {__version__}\n\n{s_enc}", formatter_class=formatter, @@ -799,6 +855,7 @@ def kill_all_processes(): # Make sure that all processes are killed before exit atexit.register(kill_all_processes) + atterm.register(kill_all_processes) wp.run() except KeyboardInterrupt: diff --git a/bluesky_queueserver/manager/tests/common.py b/bluesky_queueserver/manager/tests/common.py index 8dac5900..e31c96ef 100644 --- a/bluesky_queueserver/manager/tests/common.py +++ b/bluesky_queueserver/manager/tests/common.py @@ -362,6 +362,7 @@ async def run(): await pq.lock_info_clear() await pq.autostart_mode_clear() await pq.stop_pending_clear() + await pq.stop() asyncio.run(run()) @@ -513,7 +514,7 @@ def stop_manager(self, timeout=10, cleanup=True): except Exception as ex: # The manager is not responsive, so just kill the process. if self._p: - self._p.kill() + self._p.terminate() self._p.wait(timeout) assert False, f"RE Manager failed to stop: {str(ex)}" @@ -533,7 +534,7 @@ def kill_manager(self, timeout=10): Timeout in seconds. """ if self._p: - self._p.kill() + self._p.terminate() self._p.wait(timeout) clear_redis_pool(redis_name_prefix=self._used_redis_name_prefix) @@ -569,6 +570,7 @@ def _create(params, *, stdout, stderr, set_redis_name_prefix): # Wait until RE Manager is started. Raise exception if the server failed to start. if not wait_for_condition(time=10, condition=condition_manager_idle): failed_to_start = True + re["re"].kill_manager() raise TimeoutError("Timeout: RE Manager failed to start.") _reset_queue_mode() @@ -606,6 +608,7 @@ def re_manager(): # Wait until RE Manager is started. Raise exception if the server failed to start. if not wait_for_condition(time=10, condition=condition_manager_idle): failed_to_start = True + re.kill_manager() raise TimeoutError("Timeout: RE Manager failed to start.") _reset_queue_mode() @@ -630,6 +633,7 @@ def re_manager_pc_copy(tmp_path): # Wait until RE Manager is started. Raise exception if the server failed to start. if not wait_for_condition(time=10, condition=condition_manager_idle): failed_to_start = True + re.kill_manager() raise TimeoutError("Timeout: RE Manager failed to start.") _reset_queue_mode() diff --git a/bluesky_queueserver/manager/tests/test_comms.py b/bluesky_queueserver/manager/tests/test_comms.py index 6006ba42..69cb129a 100644 --- a/bluesky_queueserver/manager/tests/test_comms.py +++ b/bluesky_queueserver/manager/tests/test_comms.py @@ -12,7 +12,6 @@ from bluesky_queueserver.manager.comms import ( CommJsonRpcError, CommTimeoutError, - JSONRPCResponseManager, PipeJsonRpcReceive, PipeJsonRpcSendAsync, ZMQCommSendAsync, @@ -88,16 +87,19 @@ def test_CommJsonRpcError_3_fail(): # Class PipeJsonRpcReceive -def test_PipeJsonRpcReceive_1(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcReceive_1(use_json): """ Create, start and stop `PipeJsonRpcReceive` object """ conn1, conn2 = multiprocessing.Pipe() - new_name = "Unusual Thread Name" + new_name = f"Unusual Thread Name ({use_json})" assert count_threads_with_name(new_name) == 0, "No threads are expected to exist" - pc = PipeJsonRpcReceive(conn=conn2, name=new_name) + pc = PipeJsonRpcReceive(conn=conn2, name=new_name, use_json=use_json) pc.start() assert count_threads_with_name(new_name) == 2, "Two threads are expected to exist" @@ -112,12 +114,15 @@ def test_PipeJsonRpcReceive_1(): pc.stop() +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) @pytest.mark.parametrize( "method, params, result, notification", [ - ("method_handler1", [], 5, False), - ("method_handler1", [], 5, True), ("method1", [], 5, False), + ("method1", [], 5, True), + ("method1", {}, 5, False), + ("method1", {}, 5, True), ("method2", [5], 15, False), ("method2", {"value": 5}, 15, False), ("method2", {}, 12, False), @@ -127,7 +132,8 @@ def test_PipeJsonRpcReceive_1(): ("method4", {}, 19, False), ], ) -def test_PipeJsonRpcReceive_2(method, params, result, notification): +# fmt: on +def test_PipeJsonRpcReceive_2(method, params, result, notification, use_json): """ The case of single requests. """ @@ -157,8 +163,7 @@ def method_handler4(self, *, value=4): some_class = SomeClass() conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2) - pc.add_method(method_handler1) # No name is specified, default name is "method_handler1" + pc = PipeJsonRpcReceive(conn=conn2, use_json=use_json) pc.add_method(method_handler1, "method1") pc.add_method(method_handler2, "method2") pc.add_method(method_handler3, "method3") @@ -169,12 +174,13 @@ def method_handler4(self, *, value=4): value_nonlocal = None request = format_jsonrpc_msg(method, params, notification=notification) - conn1.send(json.dumps(request)) + request_json = json.dumps(request) if use_json else request + conn1.send(request_json) if conn1.poll(timeout=0.5): # Set timeout large enough if not notification: response = conn1.recv() - response = json.loads(response) - assert response["id"] == request["id"], "Response ID does not match message ID." + response = json.loads(response) if use_json else response + assert response["id"] == request["id"], f"Response ID does not match message ID: {response}" assert "result" in response, f"Key 'result' is not contained in response: {response}" assert response["result"] == result, f"Result does not match the expected: {response}" assert value_nonlocal == "function_was_called", "Non-local variable has incorrect value" @@ -190,7 +196,10 @@ def method_handler4(self, *, value=4): pc.stop() -def test_PipeJsonRpcReceive_3(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcReceive_3(use_json): """ Test sending multiple requests """ @@ -205,7 +214,7 @@ def method_handler4(self, *, value=4): some_class = SomeClass() conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2) + pc = PipeJsonRpcReceive(conn=conn2, use_json=use_json) pc.add_method(method_handler3, "method3") pc.add_method(some_class.method_handler4, "method4") pc.start() @@ -216,11 +225,12 @@ def method_handler4(self, *, value=4): format_jsonrpc_msg("method4", {"value": 3}, notification=True), format_jsonrpc_msg("method4", {"value": 9}), ] - conn1.send(json.dumps(request)) + request_json = json.dumps(request) if use_json else request + conn1.send(request_json) if conn1.poll(timeout=0.5): # Set timeout large enough response = conn1.recv() - response = json.loads(response) + response = json.loads(response) if use_json else response assert len(response) == 2, "Unexpected number of response messages" assert response[0]["id"] == request[0]["id"], "Response ID does not match message ID." assert response[1]["id"] == request[2]["id"], "Response ID does not match message ID." @@ -232,7 +242,10 @@ def method_handler4(self, *, value=4): pc.stop() -def test_PipeJsonRpcReceive_4(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcReceive_4(use_json): """ Test if all outdated unprocessed messages are deleted from the pipe as the processing thread is started. @@ -242,7 +255,7 @@ def method_handler3(*, value=3): return value + 15 conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2) + pc = PipeJsonRpcReceive(conn=conn2, use_json=use_json) pc.add_method(method_handler3, "method3") # The thread is not started yet, but we still send a message through the pipe. @@ -253,8 +266,10 @@ def method_handler3(*, value=3): request1b = [ format_jsonrpc_msg("method3", {"value": 6}), ] - conn1.send(json.dumps(request1a)) - conn1.send(json.dumps(request1b)) + request1a_json = json.dumps(request1a) if use_json else request1a + request1b_json = json.dumps(request1b) if use_json else request1b + conn1.send(request1a_json) + conn1.send(request1b_json) # Start the processing thread. The messages that were already sent are expected to be ignored. pc.start() @@ -263,11 +278,12 @@ def method_handler3(*, value=3): request2 = [ format_jsonrpc_msg("method3", {"value": 7}), ] - conn1.send(json.dumps(request2)) + request2_json = json.dumps(request2) if use_json else request2 + conn1.send(request2_json) if conn1.poll(timeout=0.5): # Set timeout large enough response = conn1.recv() - response = json.loads(response) + response = json.loads(response) if use_json else response assert len(response) == 1, "Unexpected number of response messages" assert response[0]["id"] == request2[0]["id"], "Response ID does not match message ID." assert response[0]["result"] == 22, "Response ID does not match message ID." @@ -278,9 +294,10 @@ def method_handler3(*, value=3): # fmt: off +@pytest.mark.parametrize("use_json", [False, True]) @pytest.mark.parametrize("clear_buffer", [False, True]) # fmt: on -def test_PipeJsonRpcReceive_5(clear_buffer): +def test_PipeJsonRpcReceive_5(clear_buffer, use_json): """ Checking that the buffer overflow does not overflow the pipe. """ @@ -294,7 +311,7 @@ def method_handler3(): n_calls += 1 conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2) + pc = PipeJsonRpcReceive(conn=conn2, use_json=use_json) pc.add_method(method_handler3, "method3") pc.start() @@ -303,10 +320,11 @@ def method_handler3(): n_buf = pc._msg_recv_buffer_size - conn1.send(json.dumps(request)) + request_json = json.dumps(request) if use_json else request + conn1.send(request_json) ttime.sleep(1) for _ in range(n_buf * 2): - conn1.send(json.dumps(request)) + conn1.send(request_json) ttime.sleep(1) @@ -322,7 +340,10 @@ def method_handler3(): pc.stop() -def test_PipeJsonRpcReceive_6_failing(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcReceive_6_failing(use_json): """ Those tests are a result of exploration of how `json-rpc` error processing works. """ @@ -331,19 +352,20 @@ def method_handler3(*, value=3): return value + 15 conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2) + pc = PipeJsonRpcReceive(conn=conn2, use_json=use_json) pc.add_method(method_handler3, "method3") pc.start() # ------- Incorrect argument (arg list instead of required kwargs) ------- # Returns 'Server Error' (-32000) request = format_jsonrpc_msg("method3", [5]) - conn1.send(json.dumps(request)) + request_json = json.dumps(request) if use_json else request + conn1.send(request_json) if conn1.poll(timeout=0.5): # Set timeout large enough response = conn1.recv() - response = json.loads(response) - assert response["error"]["code"] == -32000, f"Incorrect error reported: {response}" + response = json.loads(response) if use_json else response + assert response["error"]["code"] == -32602, f"Incorrect error reported: {response}" assert response["error"]["data"]["type"] == "TypeError", "Incorrect error type." else: assert False, "Timeout occurred while waiting for response." @@ -352,11 +374,12 @@ def method_handler3(*, value=3): # 'json-prc' doesn't check parameter types. Instead the handler will crash if the argument # type is not suitable. request = format_jsonrpc_msg("method3", {"value": "abc"}) - conn1.send(json.dumps(request)) + request_json = json.dumps(request) if use_json else request + conn1.send(request_json) if conn1.poll(timeout=0.5): response = conn1.recv() - response = json.loads(response) + response = json.loads(response) if use_json else response assert response["error"]["code"] == -32000, f"Incorrect error reported: {response}" assert response["error"]["data"]["type"] == "TypeError", "Incorrect error type." else: @@ -364,22 +387,24 @@ def method_handler3(*, value=3): # ------- Incorrect argument (extra argument) ------- request = format_jsonrpc_msg("method3", {"value": 5, "unknown": 10}) - conn1.send(json.dumps(request)) + request_json = json.dumps(request) if use_json else request + conn1.send(request_json) if conn1.poll(timeout=0.5): response = conn1.recv() - response = json.loads(response) + response = json.loads(response) if use_json else response assert response["error"]["code"] == -32602, f"Incorrect error reported: {response}" else: assert False, "Timeout occurred while waiting for response." # ------- Non-existing method ('Method not found' error) ------- request = format_jsonrpc_msg("method_handler3", {"value": 5}) - conn1.send(json.dumps(request)) + request_json = json.dumps(request) if use_json else request + conn1.send(request_json) if conn1.poll(timeout=0.5): response = conn1.recv() - response = json.loads(response) + response = json.loads(response) if use_json else response assert response["error"]["code"] == -32601, f"Incorrect error reported: {response}" else: assert False, "Timeout occurred while waiting for response." @@ -387,7 +412,10 @@ def method_handler3(*, value=3): pc.stop() -def test_PipeJsonRpcReceive_7_failing(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcReceive_7_failing(use_json): """ Exception is raised inside the handler is causing 'Server Error' -32000. Returns error type (Exception type) and message. It is questionable whether @@ -402,16 +430,17 @@ def method_handler5(): raise RuntimeError("Function crashed ...") conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2) + pc = PipeJsonRpcReceive(conn=conn2, use_json=use_json) pc.add_method(method_handler5, "method5") pc.start() request = format_jsonrpc_msg("method5") - conn1.send(json.dumps(request)) + request_json = json.dumps(request) if use_json else request + conn1.send(request_json) if conn1.poll(timeout=0.5): # Set timeout large enough response = conn1.recv() - response = json.loads(response) + response = json.loads(response) if use_json else response assert response["error"]["code"] == -32000, f"Incorrect error reported: {response}" assert response["error"]["data"]["type"] == "RuntimeError", "Incorrect error type." assert response["error"]["data"]["message"] == "Function crashed ...", "Incorrect message." @@ -421,7 +450,10 @@ def method_handler5(): pc.stop() -def test_PipeJsonRpcReceive_8_failing(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcReceive_8_failing(use_json): """ This is an example of handler to timeout. 'json-rpc' package can not handle timeouts. Care must be taken to write handles that execute quickly. Timeouts must be handled @@ -434,14 +466,14 @@ def method_handler6(): ttime.sleep(3) # Longer than 'poll' timeout conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2) + pc = PipeJsonRpcReceive(conn=conn2, use_json=use_json) pc.add_method(method_handler6, "method6") pc.start() # Non-existing method ('Method not found' error) request = format_jsonrpc_msg("method6") - - conn1.send(json.dumps(request)) + request_json = json.dumps(request) if use_json else request + conn1.send(request_json) if conn1.poll(timeout=0.5): # Set timeout large enough assert False, "The test is expected to timeout." @@ -455,12 +487,15 @@ def method_handler6(): # Class PipeJsonRpcSendAsync -def test_PipeJsonRpcSendAsync_1(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcSendAsync_1(use_json): """ Create, start and stop `PipeJsonRpcReceive` object """ conn1, conn2 = multiprocessing.Pipe() - new_name = "Unusual Thread Name" + new_name = f"Unusual Thread Name ({use_json})" async def object_start_stop(): assert count_threads_with_name(new_name) == 0, "No threads are expected to exist" @@ -482,12 +517,15 @@ async def object_start_stop(): asyncio.run(object_start_stop()) +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) @pytest.mark.parametrize( "method, params, result, notification", [ - ("method_handler1", [], 5, False), - ("method_handler1", [], 5, True), ("method1", [], 5, False), + ("method1", [], 5, True), + ("method1", {}, 5, False), + ("method1", {}, 5, True), ("method2", [5], 15, False), ("method2", {"value": 5}, 15, False), ("method2", {}, 12, False), @@ -497,7 +535,8 @@ async def object_start_stop(): ("method4", {}, 19, False), ], ) -def test_PipeJsonRpcSendAsync_2(method, params, result, notification): +# fmt: on +def test_PipeJsonRpcSendAsync_2(method, params, result, notification, use_json): """ Test of basic functionality. Here we don't test for timeout case (it raises an exception). """ @@ -527,8 +566,7 @@ def method_handler4(self, *, value=4): some_class = SomeClass() conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2, name="comm-server") - pc.add_method(method_handler1) # No name is specified, default name is "method_handler1" + pc = PipeJsonRpcReceive(conn=conn2, name="comm-server", use_json=use_json) pc.add_method(method_handler1, "method1") pc.add_method(method_handler2, "method2") pc.add_method(method_handler3, "method3") @@ -538,7 +576,7 @@ def method_handler4(self, *, value=4): async def send_messages(): nonlocal value_nonlocal - p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client") + p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client", use_json=use_json) p_send.start() for n in range(3): @@ -557,7 +595,10 @@ async def send_messages(): pc.stop() -def test_PipeJsonRpcSendAsync_3(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcSendAsync_3(use_json): """ Put multiple messages to the loop at once. The should be processed one by one. """ @@ -573,12 +614,12 @@ def method_handler1(): return n_return conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2, name="comm-server") + pc = PipeJsonRpcReceive(conn=conn2, name="comm-server", use_json=use_json) pc.add_method(method_handler1, "method1") pc.start() async def send_messages(): - p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client") + p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client", use_json=use_json) p_send.start() # Submit multiple messages at once. Messages should stay at the event loop @@ -598,7 +639,10 @@ async def send_messages(): pc.stop() -def test_PipeJsonRpcSendAsync_4(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcSendAsync_4(use_json): """ Message timeout. """ @@ -607,12 +651,12 @@ def method_handler1(): ttime.sleep(1) conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2, name="comm-server") + pc = PipeJsonRpcReceive(conn=conn2, name="comm-server", use_json=use_json) pc.add_method(method_handler1, "method1") pc.start() async def send_messages(): - p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client") + p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client", use_json=use_json) p_send.start() # Submit multiple messages at once. Messages should stay at the event loop @@ -626,7 +670,10 @@ async def send_messages(): pc.stop() -def test_PipeJsonRpcSendAsync_5(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcSendAsync_5(use_json): """ Special test case. Two messages: the first message times out, the second message is send before the response @@ -643,13 +690,13 @@ def method_handler2(): return 56 conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2, name="comm-server") + pc = PipeJsonRpcReceive(conn=conn2, name="comm-server", use_json=use_json) pc.add_method(method_handler1, "method1") pc.add_method(method_handler2, "method2") pc.start() async def send_messages(): - p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client") + p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client", use_json=use_json) p_send.start() # Submit multiple messages at once. Messages should stay at the event loop @@ -673,15 +720,17 @@ class _PipeJsonRpcReceiveTest(PipeJsonRpcReceive): """ def _handle_msg(self, msg): - response = JSONRPCResponseManager.handle(msg, self._dispatcher) + response = self._response_manager.handle(msg) if response: - response = response.json self._conn.send(response) # Send the response 3 times !!! self._conn.send(response) self._conn.send(response) -def test_PipeJsonRpcSendAsync_6(caplog): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcSendAsync_6(caplog, use_json): """ Special test case. The receiving process responds with multiple replies (3) to a single request. Check that @@ -694,12 +743,12 @@ def method_handler1(): return 39 conn1, conn2 = multiprocessing.Pipe() - pc = _PipeJsonRpcReceiveTest(conn=conn2, name="comm-server") + pc = _PipeJsonRpcReceiveTest(conn=conn2, name="comm-server", use_json=use_json) pc.add_method(method_handler1, "method1") pc.start() async def send_messages(): - p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client") + p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client", use_json=use_json) p_send.start() result = await p_send.send_msg("method1", timeout=0.5) @@ -717,7 +766,10 @@ async def send_messages(): assert caplog.text.count(txt) == 2, caplog.text -def test_PipeJsonRpcSendAsync_7_fail(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcSendAsync_7_fail(use_json): """ Exception raised inside the method. """ @@ -726,12 +778,12 @@ def method_handler1(): raise RuntimeError("Function crashed ...") conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2, name="comm-server") + pc = PipeJsonRpcReceive(conn=conn2, name="comm-server", use_json=use_json) pc.add_method(method_handler1, "method1") pc.start() async def send_messages(): - p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client") + p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client", use_json=use_json) p_send.start() # Submit multiple messages at once. Messages should stay at the event loop @@ -745,7 +797,10 @@ async def send_messages(): pc.stop() -def test_PipeJsonRpcSendAsync_8_fail(): +# fmt: off +@pytest.mark.parametrize("use_json", [False, True]) +# fmt: on +def test_PipeJsonRpcSendAsync_8_fail(use_json): """ Method not found (other `json-rpc` errors will raise the same exception). """ @@ -754,12 +809,12 @@ def method_handler1(): pass conn1, conn2 = multiprocessing.Pipe() - pc = PipeJsonRpcReceive(conn=conn2, name="comm-server") + pc = PipeJsonRpcReceive(conn=conn2, name="comm-server", use_json=use_json) pc.add_method(method_handler1, "method1") pc.start() async def send_messages(): - p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client") + p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client", use_json=use_json) p_send.start() # Submit multiple messages at once. Messages should stay at the event loop diff --git a/bluesky_queueserver/manager/tests/test_json_rpc.py b/bluesky_queueserver/manager/tests/test_json_rpc.py new file mode 100644 index 00000000..9243ce6d --- /dev/null +++ b/bluesky_queueserver/manager/tests/test_json_rpc.py @@ -0,0 +1,163 @@ +import json + +import pytest + +from ..json_rpc import JSONRPCResponseManager + + +# fmt: off +@pytest.mark.parametrize("use_json", [None, True, False]) +# fmt: on +def test_json_rpc_init(use_json): + if use_json is None: + params = {} + elif use_json in (True, False): + params = {"use_json": use_json} + else: + assert False, f"Unknown value of 'use_json': {use_json}" + + rm = JSONRPCResponseManager(**params) + assert rm._use_json == (use_json if use_json is not None else True) + + +def _method1(): + return 1 + + +def _method2(a): + return a + + +def _method3(): + pass + + +def _method4(): + raise RuntimeError("Error in 'method4'") + + +methods = { + "method1": _method1, + "method2": _method2, + "method3": _method3, + "method4": _method4, +} + + +# fmt: off +@pytest.mark.parametrize("use_json", [True, False]) +@pytest.mark.parametrize("msg_in, msg_resp", [ + ([], None), # Empty batch + ({"jsonrpc": "2.0", "method": "method1", "id": 1}, {'id': 1, 'jsonrpc': '2.0', 'result': 1}), + ({"jsonrpc": "2.0", "method": "method1"}, None), # Notification + ({"jsonrpc": "2.0", "method": "method1", "params": [], "id": 1}, {'id': 1, 'jsonrpc': '2.0', 'result': 1}), + ({"jsonrpc": "2.0", "method": "method1", "params": {}, "id": 1}, {'id': 1, 'jsonrpc': '2.0', 'result': 1}), + ({"jsonrpc": "2.0", "method": "method2", "params": [10], "id": 1}, + {'id': 1, 'jsonrpc': '2.0', 'result': 10}), + ({"jsonrpc": "2.0", "method": "method2", "params": {"a": 10}, "id": 1}, + {'id': 1, 'jsonrpc': '2.0', 'result': 10}), + + # Batch + ([{"jsonrpc": "2.0", "method": "method2", "params": [10], "id": 1}], + [{'id': 1, 'jsonrpc': '2.0', 'result': 10}]), + ([{"jsonrpc": "2.0", "method": "method2", "params": [10]}], None), # Notification + ([{"jsonrpc": "2.0", "method": "method2", "params": [10], "id": 1}, + {"jsonrpc": "2.0", "method": "method2", "params": [5]}], # Notification + [{'id': 1, 'jsonrpc': '2.0', 'result': 10}]), + + # Method returns None + ({"jsonrpc": "2.0", "method": "method3", "id": 1}, {'id': 1, 'jsonrpc': '2.0', 'result': None}), + + # Invalid request + ({"method": "method1", "id": 1}, # Missing 'jsonrpc' + {"jsonrpc": "2.0", "id": None, "error": # ID is None + {"code": -32600, "message": "Invalid request", "data": + {"type": "TypeError", "message": "Invalid message format: {'method': 'method1', 'id': 1}"}}}), + ({"json_rpc": "1.0", "method": "method1", "id": 1}, # Incorrect version of 'jsonrpc' + {"jsonrpc": "2.0", "id": None, "error": # ID is None + {"code": -32600, "message": "Invalid request", "data": + {"type": "TypeError", + "message": "Invalid message format: {'json_rpc': '1.0', 'method': 'method1', 'id': 1}"}}}), + ({"json_rpc": "2.0", "id": 1}, # 'method' is missing + {"jsonrpc": "2.0", "id": None, "error": # ID is None + {"code": -32600, "message": "Invalid request", "data": + {"type": "TypeError", "message": "Invalid message format: {'json_rpc': '2.0', 'id': 1}"}}}), + + # Unknown method + ({"jsonrpc": "2.0", "method": "unknown", "id": 1}, + {'jsonrpc': '2.0', 'id': 1, 'error': + {'code': -32601, 'message': 'Method not found', 'data': + {'type': 'TypeError', 'message': 'Unknown method: unknown'}}}), + + # Invalid params + ({"jsonrpc": "2.0", "method": "method2", "params": 10, "id": 1}, # Params should be list or dict + {"jsonrpc": "2.0", "id": 1, "error": + {"code": -32602, "message": "Invalid params", "data": + {"type": "TypeError", + "message": "Invalid params in the message {'jsonrpc': '2.0', 'method': 'method2', 'params': 10, 'id': 1}" + }}}), + ({"jsonrpc": "2.0", "method": "method2", "params": {"b": 10}, "id": 1}, # No param 'b' + {"jsonrpc": "2.0", "id": 1, "error": + {"code": -32602, "message": "Invalid params", "data": + {"type": "TypeError", + "message": ( + "Invalid params in the message {'jsonrpc': '2.0', 'method': 'method2', 'params': " + "{'b': 10}, 'id': 1}: _method2() got an unexpected keyword argument 'b'") + }}}), + + # Server error + ({"jsonrpc": "2.0", "method": "method4", "id": 1}, + {"jsonrpc": "2.0", "id": 1, "error": + {"code": -32000, "message": "Server error", "data": + {"type": "RuntimeError", "message": "Error in 'method4'"}}}), + + # Errors in batches of messages + ([{"jsonrpc": "2.0", "method": "unknown", "id": 2}, + {"jsonrpc": "2.0", "method": "method2", "params": [10], "id": 1}, + {"jsonrpc": "2.0", "method": "method4", "id": 3}], + [{'id': 2, 'jsonrpc': '2.0', 'error': + {'code': -32601, 'message': 'Method not found', 'data': + {'type': 'TypeError', 'message': 'Unknown method: unknown'}}}, + {'id': 1, 'jsonrpc': '2.0', 'result': 10}, + {"jsonrpc": "2.0", "id": 3, "error": + {"code": -32000, "message": "Server error", "data": + {"type": "RuntimeError", "message": "Error in 'method4'"}}} + ]), +]) +# fmt: on +def test_json_rpc_handle(use_json, msg_in, msg_resp): + rm = JSONRPCResponseManager(use_json=use_json) + for k, v in methods.items(): + rm.add_method(v, k) + + if use_json: + msg_in = json.dumps(msg_in) + resp = rm.handle(msg_in) + if use_json and resp is not None: + resp = json.loads(resp) + assert resp == msg_resp + + +def test_json_rpc_corrupt_message(): + """ + Test the case when the handler fails to decode JSON message. + """ + rm = JSONRPCResponseManager(use_json=True) + msg = "{'json_rpc}" + resp = rm.handle(msg) + resp = json.loads(resp) + assert resp == { + "jsonrpc": "2.0", + "id": None, + "error": { + "code": -32700, + "message": "Parse error", + "data": { + "type": "TypeError", + "message": ( + "Failed to parse the message '{'json_rpc}': Expecting property name " + "enclosed in double quotes: line 1 column 2 (char 1)" + ), + }, + }, + } diff --git a/bluesky_queueserver/manager/tests/test_plan_queue_ops.py b/bluesky_queueserver/manager/tests/test_plan_queue_ops.py index 6430f238..2dfb48bd 100644 --- a/bluesky_queueserver/manager/tests/test_plan_queue_ops.py +++ b/bluesky_queueserver/manager/tests/test_plan_queue_ops.py @@ -49,6 +49,26 @@ async def __aexit__(self, exc_t, exc_v, exc_tb): await self._pq.stop_pending_clear() +def test_pq_start_stop(): + """ + Test for the ``PlanQueueOperations.start()`` and ``PlanQueueOperations.stop()`` + """ + + async def testing(): + pq = PlanQueueOperations(name_prefix=_test_redis_name_prefix) + await pq.start() + await pq._r_pool.ping() + await pq.stop() + assert pq._r_pool is None + + await pq.start() + await pq._r_pool.ping() + await pq.stop() + assert pq._r_pool is None + + asyncio.run(testing()) + + # fmt: off @pytest.mark.parametrize("item_in, item_out", [ ({"name": "plan1", "item_uid": "abcde"}, {"name": "plan1", "item_uid": "abcde"}), diff --git a/bluesky_queueserver/manager/tests/test_scenarios.py b/bluesky_queueserver/manager/tests/test_scenarios.py index 3bbafe23..88a4c724 100644 --- a/bluesky_queueserver/manager/tests/test_scenarios.py +++ b/bluesky_queueserver/manager/tests/test_scenarios.py @@ -486,7 +486,6 @@ def unit_test_download_data(): # fmt: off @pytest.mark.parametrize("background", [False, True]) # fmt: on -@pytest.mark.xfail(reason="Test is unreliable on CI, but expected to pass locally") def test_large_datasets_01(re_manager, background): # noqa: F811 """ Submit large array as a function parameter. When running functions on background @@ -555,7 +554,6 @@ def plan_large_array(vlist, n): """ -@pytest.mark.xfail(reason="Test is unreliable on CI, but expected to pass locally") def test_large_datasets_02(re_manager): # noqa: F811 """ Submit large array as a plan parameter. Then download the queue that contains the plan. @@ -619,7 +617,6 @@ def test_large_datasets_02(re_manager): # noqa: F811 (_plan_move_then_count, 10000, 60000), ]) # fmt: on -@pytest.mark.xfail(reason="Test is unreliable on CI, but expected to pass locally") def test_large_datasets_03(re_manager, plan, n_plans, timeout_ms): # noqa: F811 """ Submit large number of plans to the queue as a batch. diff --git a/bluesky_queueserver/manager/tests/test_start_manager.py b/bluesky_queueserver/manager/tests/test_start_manager.py index 4be3c1c1..d0e97dd5 100644 --- a/bluesky_queueserver/manager/tests/test_start_manager.py +++ b/bluesky_queueserver/manager/tests/test_start_manager.py @@ -6,6 +6,9 @@ from bluesky_queueserver.manager.start_manager import WatchdogProcess from bluesky_queueserver.tests.common import format_jsonrpc_msg +# Must match the settings in WatchdogProcess class +_use_json = False + class ReManagerEmulation(threading.Thread): """ @@ -41,7 +44,7 @@ def _heartbeat(self): hb_period, dt = 0.5, 0.01 n_wait = round(hb_period / dt) msg = format_jsonrpc_msg("heartbeat", {"value": "alive"}, notification=True) - msg_json = json.dumps(msg) + msg_json = json.dumps(msg) if _use_json else msg while True: # Since we are emulating 'kill' method, we want the function to # react to 'exit' quickly. @@ -74,12 +77,13 @@ def send_msg_to_watchdog(self, method, params=None, *, notification=False, timeo # this is acceptable for testing. Timeout would typically indicate an error. msg = format_jsonrpc_msg(method, params, notification=notification) with self._lock: - self._conn_watchdog.send(json.dumps(msg)) + msg_json = json.dumps(msg) if _use_json else msg + self._conn_watchdog.send(msg_json) if notification: return if self._conn_watchdog.poll(timeout): response_json = self._conn_watchdog.recv() - response = json.loads(response_json) + response = json.loads(response_json) if _use_json else response_json result = response["result"] else: result = None @@ -101,8 +105,9 @@ def run(self): if not self._restart: msg = format_jsonrpc_msg("manager_stopping", notification=True) + msg_json = json.dumps(msg) if _use_json else msg with self._lock: - self._conn_watchdog.send(json.dumps(msg)) + self._conn_watchdog.send(msg_json) th_hb.join() diff --git a/bluesky_queueserver/manager/worker.py b/bluesky_queueserver/manager/worker.py index 14d912f2..cf851097 100644 --- a/bluesky_queueserver/manager/worker.py +++ b/bluesky_queueserver/manager/worker.py @@ -1346,7 +1346,7 @@ def _worker_prepare_for_startup(self): self._run_reg_cb = CallbackRegisterRun(run_list=self._active_run_list) # Class that supports communication over the pipe - self._comm_to_manager = PipeJsonRpcReceive(conn=self._conn, name="RE Worker-Manager Comm") + self._comm_to_manager = PipeJsonRpcReceive(conn=self._conn, use_json=False, name="RE Worker-Manager Comm") self._comm_to_manager.add_method(self._request_state_handler, "request_state") self._comm_to_manager.add_method(self._request_ip_connect_info, "request_ip_connect_info") diff --git a/requirements.txt b/requirements.txt index b6536c30..b9f6b64a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ bluesky-kafka databroker event-model ipykernel -json-rpc jsonschema jupyter-client>=7.4.2 jupyter-console @@ -15,6 +14,8 @@ numpydoc openpyxl ophyd packaging +pandas +pyarrow pydantic python-multipart pyyaml