Skip to content

Commit

Permalink
fix:httpclient events
Browse files Browse the repository at this point in the history
bring the http client api closes to the websocket one
  • Loading branch information
JarbasAl committed Jan 8, 2025
1 parent 54d3e97 commit 3471d80
Showing 1 changed file with 70 additions and 14 deletions.
84 changes: 70 additions & 14 deletions hivemind_bus_client/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@

import pybase64
import requests
from Cryptodome.PublicKey import RSA
from ovos_bus_client import Message as MycroftMessage, MessageBusClient as OVOSBusClient
from ovos_bus_client.session import Session
from ovos_utils.fakebus import FakeBus
from ovos_utils.log import LOG

from Cryptodome.PublicKey import RSA
from hivemind_bus_client.client import BinaryDataCallbacks
from hivemind_bus_client.encryption import (encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin,
SupportedEncodings, SupportedCiphers)
from hivemind_bus_client.identity import NodeIdentity
from hivemind_bus_client.message import HiveMessage, HiveMessageType, HiveMindBinaryPayloadType
from hivemind_bus_client.protocol import HiveMindSlaveProtocol
from hivemind_bus_client.serialization import get_bitstring, decode_bitstring
from hivemind_bus_client.util import serialize_message
from hivemind_bus_client.encryption import (encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin,
SupportedEncodings, SupportedCiphers)
from poorman_handshake.asymmetric.utils import encrypt_RSA, load_RSA_key, sign_RSA



class HiveMindHTTPClient(threading.Thread):
"""
A client for the HiveMind HTTP server protocol.
Expand Down Expand Up @@ -75,7 +74,17 @@ def __init__(self, key: Optional[str] = None,
self.stopped = threading.Event()
self.connected = threading.Event()
self._handlers: Dict[str, List[Callable[[HiveMessage], None]]] = {}
self._agent_handlers: Dict[str, List[Callable[[MycroftMessage], None]]] = {}
self.start()
self.wait_for_handshake()

def wait_for_handshake(self, timeout=5):
self.connected.wait(timeout=timeout)
self.handshake_event.wait(timeout=timeout)
if not self.handshake_event.is_set():
self.protocol.start_handshake()
self.wait_for_handshake()
time.sleep(1) # let server process our "hello" response

@property
def base_url(self) -> str:
Expand Down Expand Up @@ -213,6 +222,12 @@ def _handle_hive_protocol(self, message: HiveMessage):
handler(message)
except Exception as e:
LOG.error(f"Error in message handler: {handler} - {e}")
if message.msg_type == HiveMessageType.BUS and message.payload.msg_type in self._agent_handlers:
for handler in self._agent_handlers[message.payload.msg_type]:
try:
handler(message.payload)
except Exception as e:
LOG.error(f"Error in agent message handler: {handler} - {e}")

# these are not supposed to come from server -> client
if message.msg_type == HiveMessageType.ESCALATE:
Expand Down Expand Up @@ -247,11 +262,21 @@ def on(self, event_name: str, func: Callable):
self._handlers[event_name] = []
self._handlers[event_name].append(func)

def on_mycroft(self, event_name: str, func: Callable):
if event_name not in self._agent_handlers:
self._agent_handlers[event_name] = []
self._agent_handlers[event_name].append(func)

def remove(self, event_name: str, func: Callable):
if event_name in self._handlers:
self._handlers[event_name] = [h for h in self._handlers[event_name]
if h is not func]

def remove_mycroft(self, event_name: str, func: Callable):
if event_name in self._agent_handlers:
self._agent_handlers[event_name] = [h for h in self._agent_handlers[event_name]
if h is not func]

def emit(self, message: Union[MycroftMessage, HiveMessage],
binary_type: HiveMindBinaryPayloadType = HiveMindBinaryPayloadType.UNDEFINED):
if not self.connected.is_set():
Expand Down Expand Up @@ -294,12 +319,11 @@ def emit(self, message: Union[MycroftMessage, HiveMessage],
payload = serialize_message(message)
if self.crypto_key:
payload = encrypt_as_json(self.crypto_key, payload,
cipher=self.cipher, encoding=self.json_encoding)
cipher=self.cipher, encoding=self.json_encoding)

url = f"{self.base_url}/send_message"
return requests.post(url, data={"message": payload}, params={"authorization": self.auth})


# targeted messages for nodes, asymmetric encryption
def emit_intercom(self, message: Union[MycroftMessage, HiveMessage],
pubkey: Union[str, bytes, RSA.RsaKey]):
Expand Down Expand Up @@ -343,6 +367,7 @@ def disconnect(self) -> dict:
url = f"{self.base_url}/disconnect"
response = requests.post(url, params={"authorization": self.auth})
self.connected.clear()
self.handshake_event.clear()
return response.json()

def get_messages(self) -> List[str]:
Expand All @@ -366,11 +391,17 @@ def get_binary_messages(self) -> List[bytes]:
return [pybase64.b64decode(m) for m in response["b64_messages"]]



# Example usage:
if __name__ == "__main__":
LOG.set_level("DEBUG")
from ovos_utils.log import init_service_logger

init_service_logger("HiveMindHTTP")
LOG.set_level("ERROR")


got_tts = threading.Event()

# To handle binary data subclass BinaryDataCallbacks
class BinaryDataHandler(BinaryDataCallbacks):
def handle_receive_tts(self, bin_data: bytes,
utterance: str,
Expand All @@ -381,17 +412,42 @@ def handle_receive_tts(self, bin_data: bytes,
print(f"utterance: {utterance}", f"lang: {lang}", f"file_name: {file_name}")
# got 33836 bytes of TTS audio
# utterance: hello world lang: en-US file_name: 5eb63bbbe01eeed093cb22bb8f5acdc3.wav
got_tts.set()



# not passing key etc so it uses identity file
client = HiveMindHTTPClient(host="http://localhost", port=5679,
bin_callbacks=BinaryDataHandler())

time.sleep(5)
# send HiveMessages as usual
client.emit(HiveMessage(HiveMessageType.BUS,
MycroftMessage("speak:synth",
{"utterance": "hello world"})))
time.sleep(6)
client.emit(HiveMessage(HiveMessageType.BUS,
MycroftMessage("recognizer_loop:utterance",
{"utterances": ["who is Isaac Newton"]})))
time.sleep(20)

got_tts.wait()

# to handle agent responses, use client.on_mycroft("event", handler)
answer = None
answered = threading.Event()

def handle_speak(message: MycroftMessage):
global answer
answer = message.data['utterance']

def utt_handled(message: MycroftMessage):
answered.set()

client.on_mycroft("speak", handle_speak)
client.on_mycroft("ovos.utterance.handled", utt_handled)


while True:
utt = input("> ")
client.emit(HiveMessage(HiveMessageType.BUS,
MycroftMessage("recognizer_loop:utterance",
{"utterances": [utt]})))
answered.wait()
print(answer)
answered.clear()

0 comments on commit 3471d80

Please sign in to comment.