From 3471d806d50ba15a5b5a9bc9adc62f3e6f816bc0 Mon Sep 17 00:00:00 2001 From: miro Date: Wed, 8 Jan 2025 18:00:18 +0000 Subject: [PATCH] fix:httpclient events bring the http client api closes to the websocket one --- hivemind_bus_client/http_client.py | 84 +++++++++++++++++++++++++----- 1 file changed, 70 insertions(+), 14 deletions(-) diff --git a/hivemind_bus_client/http_client.py b/hivemind_bus_client/http_client.py index 213e6a8..57e9cde 100644 --- a/hivemind_bus_client/http_client.py +++ b/hivemind_bus_client/http_client.py @@ -6,24 +6,23 @@ import pybase64 import requests +from Cryptodome.PublicKey import RSA from ovos_bus_client import Message as MycroftMessage, MessageBusClient as OVOSBusClient from ovos_bus_client.session import Session from ovos_utils.fakebus import FakeBus from ovos_utils.log import LOG -from Cryptodome.PublicKey import RSA from hivemind_bus_client.client import BinaryDataCallbacks +from hivemind_bus_client.encryption import (encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin, + SupportedEncodings, SupportedCiphers) from hivemind_bus_client.identity import NodeIdentity from hivemind_bus_client.message import HiveMessage, HiveMessageType, HiveMindBinaryPayloadType from hivemind_bus_client.protocol import HiveMindSlaveProtocol from hivemind_bus_client.serialization import get_bitstring, decode_bitstring from hivemind_bus_client.util import serialize_message -from hivemind_bus_client.encryption import (encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin, - SupportedEncodings, SupportedCiphers) from poorman_handshake.asymmetric.utils import encrypt_RSA, load_RSA_key, sign_RSA - class HiveMindHTTPClient(threading.Thread): """ A client for the HiveMind HTTP server protocol. @@ -75,7 +74,17 @@ def __init__(self, key: Optional[str] = None, self.stopped = threading.Event() self.connected = threading.Event() self._handlers: Dict[str, List[Callable[[HiveMessage], None]]] = {} + self._agent_handlers: Dict[str, List[Callable[[MycroftMessage], None]]] = {} self.start() + self.wait_for_handshake() + + def wait_for_handshake(self, timeout=5): + self.connected.wait(timeout=timeout) + self.handshake_event.wait(timeout=timeout) + if not self.handshake_event.is_set(): + self.protocol.start_handshake() + self.wait_for_handshake() + time.sleep(1) # let server process our "hello" response @property def base_url(self) -> str: @@ -213,6 +222,12 @@ def _handle_hive_protocol(self, message: HiveMessage): handler(message) except Exception as e: LOG.error(f"Error in message handler: {handler} - {e}") + if message.msg_type == HiveMessageType.BUS and message.payload.msg_type in self._agent_handlers: + for handler in self._agent_handlers[message.payload.msg_type]: + try: + handler(message.payload) + except Exception as e: + LOG.error(f"Error in agent message handler: {handler} - {e}") # these are not supposed to come from server -> client if message.msg_type == HiveMessageType.ESCALATE: @@ -247,11 +262,21 @@ def on(self, event_name: str, func: Callable): self._handlers[event_name] = [] self._handlers[event_name].append(func) + def on_mycroft(self, event_name: str, func: Callable): + if event_name not in self._agent_handlers: + self._agent_handlers[event_name] = [] + self._agent_handlers[event_name].append(func) + def remove(self, event_name: str, func: Callable): if event_name in self._handlers: self._handlers[event_name] = [h for h in self._handlers[event_name] if h is not func] + def remove_mycroft(self, event_name: str, func: Callable): + if event_name in self._agent_handlers: + self._agent_handlers[event_name] = [h for h in self._agent_handlers[event_name] + if h is not func] + def emit(self, message: Union[MycroftMessage, HiveMessage], binary_type: HiveMindBinaryPayloadType = HiveMindBinaryPayloadType.UNDEFINED): if not self.connected.is_set(): @@ -294,12 +319,11 @@ def emit(self, message: Union[MycroftMessage, HiveMessage], payload = serialize_message(message) if self.crypto_key: payload = encrypt_as_json(self.crypto_key, payload, - cipher=self.cipher, encoding=self.json_encoding) + cipher=self.cipher, encoding=self.json_encoding) url = f"{self.base_url}/send_message" return requests.post(url, data={"message": payload}, params={"authorization": self.auth}) - # targeted messages for nodes, asymmetric encryption def emit_intercom(self, message: Union[MycroftMessage, HiveMessage], pubkey: Union[str, bytes, RSA.RsaKey]): @@ -343,6 +367,7 @@ def disconnect(self) -> dict: url = f"{self.base_url}/disconnect" response = requests.post(url, params={"authorization": self.auth}) self.connected.clear() + self.handshake_event.clear() return response.json() def get_messages(self) -> List[str]: @@ -366,11 +391,17 @@ def get_binary_messages(self) -> List[bytes]: return [pybase64.b64decode(m) for m in response["b64_messages"]] - # Example usage: if __name__ == "__main__": - LOG.set_level("DEBUG") + from ovos_utils.log import init_service_logger + init_service_logger("HiveMindHTTP") + LOG.set_level("ERROR") + + + got_tts = threading.Event() + + # To handle binary data subclass BinaryDataCallbacks class BinaryDataHandler(BinaryDataCallbacks): def handle_receive_tts(self, bin_data: bytes, utterance: str, @@ -381,17 +412,42 @@ def handle_receive_tts(self, bin_data: bytes, print(f"utterance: {utterance}", f"lang: {lang}", f"file_name: {file_name}") # got 33836 bytes of TTS audio # utterance: hello world lang: en-US file_name: 5eb63bbbe01eeed093cb22bb8f5acdc3.wav + got_tts.set() + + # not passing key etc so it uses identity file client = HiveMindHTTPClient(host="http://localhost", port=5679, bin_callbacks=BinaryDataHandler()) - time.sleep(5) + # send HiveMessages as usual client.emit(HiveMessage(HiveMessageType.BUS, MycroftMessage("speak:synth", {"utterance": "hello world"}))) - time.sleep(6) - client.emit(HiveMessage(HiveMessageType.BUS, - MycroftMessage("recognizer_loop:utterance", - {"utterances": ["who is Isaac Newton"]}))) - time.sleep(20) \ No newline at end of file + + got_tts.wait() + + # to handle agent responses, use client.on_mycroft("event", handler) + answer = None + answered = threading.Event() + + def handle_speak(message: MycroftMessage): + global answer + answer = message.data['utterance'] + + def utt_handled(message: MycroftMessage): + answered.set() + + client.on_mycroft("speak", handle_speak) + client.on_mycroft("ovos.utterance.handled", utt_handled) + + + while True: + utt = input("> ") + client.emit(HiveMessage(HiveMessageType.BUS, + MycroftMessage("recognizer_loop:utterance", + {"utterances": [utt]}))) + answered.wait() + print(answer) + answered.clear() +