Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable JSON encoding for interprocess communication #301

Merged
merged 16 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions bluesky_queueserver/manager/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@

import zmq
import zmq.asyncio
from jsonrpc import JSONRPCResponseManager
from jsonrpc.dispatcher import Dispatcher

from .json_rpc import JSONRPCResponseManager
from .logging_setup import PPrintForLogging as ppfl

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -99,6 +98,10 @@ class PipeJsonRpcReceive:
----------
conn: multiprocessing.Connection
Reference to bidirectional end of a pipe (multiprocessing.Pipe)
use_json: boolean
If *True*, the messages are expected to be in encoded as JSON. Otherwise the messages
are expected to be binary. The parameter also enables/disables JSON encoding of
response messages.
name: str
Name of the receiving thread (it is better to assign meaningful unique names to threads.

Expand All @@ -108,7 +111,7 @@ class PipeJsonRpcReceive:
.. code-block:: python

conn1, conn2 = multiprocessing.Pipe()
pc = PipeJsonRPC(conn=conn1, name="RE QServer Receive")
pc = PipeJsonRPC(conn=conn1, use_json=True, name="RE QServer Receive")

def func():
print("Testing")
Expand All @@ -119,9 +122,9 @@ def func():
pc.stop() # Stop before exit to stop the thread.
"""

def __init__(self, conn, *, name="RE QServer Comm"):
def __init__(self, conn, *, use_json=True, name="RE QServer Comm"):
self._conn = conn
self._dispatcher = Dispatcher() # json-rpc dispatcher
self._response_manager = JSONRPCResponseManager(use_json=use_json)
self._thread_running = False # Set True to exit the thread

self._thread_name = name
Expand Down Expand Up @@ -165,19 +168,19 @@ def stop(self):
def __del__(self):
self.stop()

def add_method(self, handler, name=None):
def add_method(self, handler, name):
"""
Add method to json-rpc dispatcher.

Parameters
----------
handler: callable
Reference to a handler
name: str, optional
Name to register (default is the handler name)
name: str
Name to register
"""
# Add method to json-rpc dispatcher
self._dispatcher.add_method(handler, name)
self._response_manager.add_method(handler, name)

def _start_conn_thread(self):
if not self._thread_running:
Expand Down Expand Up @@ -243,9 +246,8 @@ def _handle_msg(self, msg):
# if not isinstance(msg_json, dict) or (msg_json["method"] != "heartbeat"):
# logger.debug("Command received RE Manager->Watchdog: %s", ppfl(msg_json))

response = JSONRPCResponseManager.handle(msg, self._dispatcher)
response = self._response_manager.handle(msg)
if response:
response = response.json
self._conn.send(response)


Expand All @@ -263,6 +265,9 @@ class PipeJsonRpcSendAsync:
Reference to bidirectional end of a pipe (multiprocessing.Pipe)
timeout: float
Default value of timeout: maximum time to wait for response after a message is sent
use_json: boolean
Enables/disables encoding of the outgoing messages as JSON. If *True*, then the response
messages are also expected to be encoded as JSON. Otherwise the messages are binary.
name: str
Name of the receiving thread (it is better to assign meaningful unique names to threads.

Expand All @@ -275,7 +280,7 @@ class PipeJsonRpcSendAsync:

async def send_messages():
# Must be instantiated and used within the loop
p_send = PipeJsonRpcSendAsync(conn=conn1, name="comm-client")
p_send = PipeJsonRpcSendAsync(conn=conn1, use_json=True, name="comm-client")
p_send.start()

method = "method_name"
Expand All @@ -288,7 +293,7 @@ async def send_messages():
pc.stop()


pc = PipeJsonRpcSendAsync(conn=conn1, name="RE QServer Receive")
pc = PipeJsonRpcSendAsync(conn=conn1, use_json=True, name="RE QServer Receive")

def func():
print("Testing")
Expand All @@ -300,9 +305,10 @@ def func():

"""

def __init__(self, conn, *, timeout=0.5, name="RE QServer Comm"):
def __init__(self, conn, *, timeout=0.5, use_json=True, name="RE QServer Comm"):
self._conn = conn
self._loop = asyncio.get_running_loop()
self._use_json = use_json

self._thread_name = name

Expand Down Expand Up @@ -496,7 +502,7 @@ def _pipe_receive(self):
if self._conn.poll(self._conn_polling_timeout):
try:
msg_json = self._conn.recv()
msg = json.loads(msg_json)
msg = json.loads(msg_json) if self._use_json else msg_json
# logger.debug("Message Watchdog->Manager received: '%s'", ppfl(msg))
# Messages should be handled in the event loop
self._loop.call_soon_threadsafe(self._conn_received, msg)
Expand All @@ -512,7 +518,7 @@ def _pipe_send(self):
msg = None
try:
msg, fut_send = self._msg_send_buffer.get(timeout=self._conn_polling_timeout)
msg_json = json.dumps(msg)
msg_json = json.dumps(msg) if self._use_json else msg
self._conn.send(msg_json)
self._loop.call_soon_threadsafe(self._conn_sent, msg, fut_send)
except queue.Empty:
Expand Down
230 changes: 230 additions & 0 deletions bluesky_queueserver/manager/json_rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
import inspect
import json
import logging

logger = logging.getLogger(__name__)


class JSONRPCResponseManager:
def __init__(self, *, use_json=True):
"""
Simplified implementation of message handler for JSON RPC protocol.
Written as a replacement for ``json-rpc`` package and supports all features
used by Queue Server. This implementation also supports binary messages
(not encoded as JSON), which significatly speeds up interprocess communication.

Parameters
----------
use_json: boolean
If the parameter is *True*, then incoming messages (passed to the ``handle``
method) are expected to be in JSON format. Otherwise, the messages are expected
to be dictionary or a list of dictionaries and no decoding is applied.
"""
self._methods = {}
self._use_json = use_json

def add_method(self, handler, method):
"""
Register a handler.

Parameters
----------
handler: Callable
Method handler.
method: str
Method name. JSON messages must call one of the registered methods.

Returns
-------
None
"""
self._methods[method] = handler

def _decode(self, msg):
"""
Decode the incoming message from JSON (if ``use_json`` is *True*) or return
the message unchanged.

Parameters
----------
msg: str, dict or list(dict)
Encoded message in JSON format (*str*) or unencoded message (*dict* or
*list(dict)*).

Returns
-------
dict or list(dict)
Decoded message.
"""
if self._use_json:
return json.loads(msg)
else:
return msg

def _encode(self, msg):
"""
Encode the response message to JSON (if ``use_json`` is *True*) or return
the message unchanged.

Parameters
----------
msg: dict or list(dict)
A single message (*dict*) or a batch of messages (*list(dict)*).

Returns
-------
str, dict or list(dict)
Encoded response message in JSON format (*str*) or original representation
(*dict* or *list(dict)*).
"""
if self._use_json:
return json.dumps(msg)
else:
return msg

def _get_error_msg(self, error_code):
"""
Returns a standard JSON RPC message based on the error code.

Parameters
----------
error_code: int
One of the standard JSON RPC error codes.

Returns
-------
str
Standard message based on ``error_code``.
"""
msgs = {
-32700: "Parse error",
-32600: "Invalid request",
-32601: "Method not found",
-32602: "Invalid params",
-32603: "Internal error",
-32000: "Server error",
}
return msgs.get(error_code, "Unknown error")

def _handle_single_msg(self, msg):
"""
Handle a single JSON RPC message.

Parameters
----------
msg: dict
Decoded JSON RPC message.

Returns
-------
response: dict
Response message
"""
error_code, is_notification, msg_id, response = 0, "id" not in msg, None, None

try:
if not isinstance(msg, dict) or "method" not in msg or "jsonrpc" not in msg or msg["jsonrpc"] != "2.0":
error_code = -32600
raise TypeError(f"Invalid message format: {msg!r}")

method = msg["method"]
params = msg.get("params", {})
msg_id = msg.get("id", None)

if not isinstance(params, (tuple, list, dict)):
error_code = -32602
raise TypeError(f"Invalid params in the message {msg!r}")

handler = self._methods.get(method)
if handler is None:
error_code = -32601
raise TypeError(f"Unknown method: {method}")

try:
if isinstance(params, dict):
inspect.getcallargs(handler, **params)
else:
inspect.getcallargs(handler, *params)
except Exception as ex:
error_code = -32602
raise TypeError(f"Invalid params in the message {msg!r}: {ex}") from ex

try:
if isinstance(params, dict):
result = handler(**params)
else: # Tuple or list
result = handler(*params)
if not is_notification:
response = {"jsonrpc": "2.0", "id": msg_id, "result": result}
except Exception:
error_code = -32000
raise

except Exception as ex:
if not is_notification:
data = {"type": ex.__class__.__name__, "message": str(ex)}
error = {"code": error_code, "message": self._get_error_msg(error_code), "data": data}
response = {"jsonrpc": "2.0", "id": msg_id, "error": error}

return response

def handle(self, msg_full):
"""
Handle JSON RPC message. The message can contain a single message (*dict*) or a batch of messages
(*list(dict)*). Messages in the batch are executed one by one. The response is also a single
message if input message is *dict* or a batch of messages if the input message is *list(dict)*.

If the response value returned by the function is *None*, it should not be sent to client.
It happens when the input message is a notification ('id' is missing) or all messages in
the batch are notifications. Responses to notifications are not included in the batch of
the response messages, so the response batch may contain less messages than the input batch.

If an input message can not be decoded (invalid JSON), then the response has 'id' set to *None*.

Parameters
----------
msg_full: str, dict or list(dict)
Input message encoded as JSON (*str*) or not encoded (single message represented as *dict* or
a batch of messages represented as *list(dict)*). The constructor parameter ``use_json`` must
be *True* to use JSON encoding and *False* otherwise.

Response
--------
str, dict, list(dict) or None
Output message or a batch of messages in the same format as ``msg_full``. The response
is *None* if there is nothing to send back to the client.
"""
response, is_batch = [], False

try:
try:
msg_full = self._decode(msg_full)
except Exception as ex:
raise TypeError(f"Failed to parse the message '{msg_full}': {ex}") from ex

is_batch = isinstance(msg_full, list)
if not is_batch:
msg_full = [msg_full]

for msg in msg_full:
single_response = self._handle_single_msg(msg)
if single_response:
response.append(single_response)

if not response:
response = None
elif not is_batch:
response = response[0]

if response:
response = self._encode(response)
except Exception as ex:
data = {"type": ex.__class__.__name__, "message": str(ex)}
response = {
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32700, "message": self._get_error_msg(-32700), "data": data},
}
response = self._encode(response)

return response
3 changes: 3 additions & 0 deletions bluesky_queueserver/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3634,10 +3634,12 @@ async def zmq_server_comm(self):

self._comm_to_watchdog = PipeJsonRpcSendAsync(
conn=self._watchdog_conn,
use_json=False,
name="RE Manager-Watchdog Comm",
)
self._comm_to_worker = PipeJsonRpcSendAsync(
conn=self._worker_conn,
use_json=False,
name="RE Manager-Worker Comm",
timeout=self._comm_to_worker_timeout,
)
Expand Down Expand Up @@ -3814,6 +3816,7 @@ async def zmq_server_comm(self):
self._heartbeat_generator_task.cancel()
self._comm_to_watchdog.stop()
self._comm_to_worker.stop()
await self._plan_queue.stop()
self._zmq_socket.close()
logger.info("RE Manager was stopped by ZMQ command.")
break
Expand Down
Loading
Loading