Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bring http client closes to websocket client #64

Merged
merged 4 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 71 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Hivemind Websocket Client
# Hivemind Bus Client

![logo](./logo.png)
Python client library for [hivemind-core](https://github.com/JarbasHiveMind/HiveMind-core)

## Install

Expand All @@ -10,36 +10,86 @@ pip install hivemind_bus_client

## Usage

via [hivemind-http-protocol](https://github.com/JarbasHiveMind/hivemind-http-protocol)

```python
from time import sleep
from ovos_bus_client import Message
from hivemind_bus_client import HiveMessageBusClient
from hivemind_bus_client.decorators import on_escalate, \
on_shared_bus, on_ping, on_broadcast, on_propagate, on_mycroft_message, \
on_registry_opcode, on_third_party, on_cascade, on_handshake, on_hello, \
on_rendezvous, on_hive_message, on_third_party, on_payload
from hivemind_bus_client.http_client import HiveMindHTTPClient

key = "super_secret_access_key"
crypto_key = "ivf1NQSkQNogWYyr"
# not passing key etc so it uses identity file
client = HiveMindHTTPClient(host="http://localhost", port=5679)
```

bus = HiveMessageBusClient(key, crypto_key=crypto_key, ssl=False)
via [hivemind-websocket-protocol](https://github.com/JarbasHiveMind/hivemind-websocket-protocol)
```python
from hivemind_bus_client.client import HiveMessageBusClient

bus.run_in_thread()
# not passing key etc so it uses identity file
client = HiveMessageBusClient(host="ws://localhost", port=5678)
```

### Example: Simple Chat

```python
import threading
from ovos_bus_client.message import Message
from hivemind_bus_client.message import HiveMessage, HiveMessageType
from hivemind_bus_client.http_client import HiveMindHTTPClient

# not passing key etc so it uses identity file
client = HiveMindHTTPClient(host="http://localhost", port=5679)

@on_mycroft_message(payload_type="speak", bus=bus)
def on_speak(msg):
print(msg.data["utterance"])
# to handle agent responses, use client.on_mycroft("event", handler)
answered = threading.Event()

def handle_speak(message: Message):
print(message.data['utterance'])

mycroft_msg = Message("recognizer_loop:utterance",
{"utterances": ["tell me a joke"]})
bus.emit_mycroft(mycroft_msg)
def utt_handled(message: Message):
answered.set()

client.on_mycroft("speak", handle_speak)
client.on_mycroft("ovos.utterance.handled", utt_handled)

sleep(50)

bus.close()
while True:
utt = input("> ")
answered.clear()
client.emit(HiveMessage(HiveMessageType.BUS,
Message("recognizer_loop:utterance", {"utterances": [utt]})))
answered.wait()
```

### Example: Remote TTS

if server is running [hivemind-audio-binary-protocol](https://github.com/JarbasHiveMind/hivemind-audio-binary-protocol)

```python
from ovos_bus_client.message import Message
from hivemind_bus_client.client import BinaryDataCallbacks
from hivemind_bus_client.message import HiveMessage, HiveMessageType
from hivemind_bus_client.http_client import HiveMindHTTPClient

# To handle binary data subclass BinaryDataCallbacks
class BinaryDataHandler(BinaryDataCallbacks):
def handle_receive_tts(self, bin_data: bytes,
utterance: str,
lang: str,
file_name: str):
# we can play it or save to file or whatever
print(f"got {len(bin_data)} bytes of TTS audio")
print(f"utterance: {utterance}", f"lang: {lang}", f"file_name: {file_name}")
# got 33836 bytes of TTS audio
# utterance: hello world lang: en-US file_name: 5eb63bbbe01eeed093cb22bb8f5acdc3.wav


# not passing key etc so it uses identity file
client = HiveMindHTTPClient(host="http://localhost", port=5679,
bin_callbacks=BinaryDataHandler())

# send HiveMessages as usual
client.emit(HiveMessage(HiveMessageType.BUS,
Message("speak:synth", {"utterance": "hello world"})))

```

## Cli Usage
Expand Down
84 changes: 70 additions & 14 deletions hivemind_bus_client/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@

import pybase64
import requests
from Cryptodome.PublicKey import RSA
from ovos_bus_client import Message as MycroftMessage, MessageBusClient as OVOSBusClient
from ovos_bus_client.session import Session
from ovos_utils.fakebus import FakeBus
from ovos_utils.log import LOG

from Cryptodome.PublicKey import RSA
from hivemind_bus_client.client import BinaryDataCallbacks
from hivemind_bus_client.encryption import (encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin,
SupportedEncodings, SupportedCiphers)
from hivemind_bus_client.identity import NodeIdentity
from hivemind_bus_client.message import HiveMessage, HiveMessageType, HiveMindBinaryPayloadType
from hivemind_bus_client.protocol import HiveMindSlaveProtocol
from hivemind_bus_client.serialization import get_bitstring, decode_bitstring
from hivemind_bus_client.util import serialize_message
from hivemind_bus_client.encryption import (encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin,
SupportedEncodings, SupportedCiphers)
from poorman_handshake.asymmetric.utils import encrypt_RSA, load_RSA_key, sign_RSA



class HiveMindHTTPClient(threading.Thread):
"""
A client for the HiveMind HTTP server protocol.
Expand Down Expand Up @@ -75,7 +74,17 @@ def __init__(self, key: Optional[str] = None,
self.stopped = threading.Event()
self.connected = threading.Event()
self._handlers: Dict[str, List[Callable[[HiveMessage], None]]] = {}
self._agent_handlers: Dict[str, List[Callable[[MycroftMessage], None]]] = {}
self.start()
self.wait_for_handshake()

def wait_for_handshake(self, timeout=5):
self.connected.wait(timeout=timeout)
self.handshake_event.wait(timeout=timeout)
if not self.handshake_event.is_set():
self.protocol.start_handshake()
self.wait_for_handshake()
time.sleep(1) # let server process our "hello" response

@property
def base_url(self) -> str:
Expand Down Expand Up @@ -213,6 +222,12 @@ def _handle_hive_protocol(self, message: HiveMessage):
handler(message)
except Exception as e:
LOG.error(f"Error in message handler: {handler} - {e}")
if message.msg_type == HiveMessageType.BUS and message.payload.msg_type in self._agent_handlers:
for handler in self._agent_handlers[message.payload.msg_type]:
try:
handler(message.payload)
except Exception as e:
LOG.error(f"Error in agent message handler: {handler} - {e}")

# these are not supposed to come from server -> client
if message.msg_type == HiveMessageType.ESCALATE:
Expand Down Expand Up @@ -247,11 +262,21 @@ def on(self, event_name: str, func: Callable):
self._handlers[event_name] = []
self._handlers[event_name].append(func)

def on_mycroft(self, event_name: str, func: Callable):
if event_name not in self._agent_handlers:
self._agent_handlers[event_name] = []
self._agent_handlers[event_name].append(func)

def remove(self, event_name: str, func: Callable):
if event_name in self._handlers:
self._handlers[event_name] = [h for h in self._handlers[event_name]
if h is not func]

def remove_mycroft(self, event_name: str, func: Callable):
if event_name in self._agent_handlers:
self._agent_handlers[event_name] = [h for h in self._agent_handlers[event_name]
if h is not func]

def emit(self, message: Union[MycroftMessage, HiveMessage],
binary_type: HiveMindBinaryPayloadType = HiveMindBinaryPayloadType.UNDEFINED):
if not self.connected.is_set():
Expand Down Expand Up @@ -294,12 +319,11 @@ def emit(self, message: Union[MycroftMessage, HiveMessage],
payload = serialize_message(message)
if self.crypto_key:
payload = encrypt_as_json(self.crypto_key, payload,
cipher=self.cipher, encoding=self.json_encoding)
cipher=self.cipher, encoding=self.json_encoding)

url = f"{self.base_url}/send_message"
return requests.post(url, data={"message": payload}, params={"authorization": self.auth})


# targeted messages for nodes, asymmetric encryption
def emit_intercom(self, message: Union[MycroftMessage, HiveMessage],
pubkey: Union[str, bytes, RSA.RsaKey]):
Expand Down Expand Up @@ -343,6 +367,7 @@ def disconnect(self) -> dict:
url = f"{self.base_url}/disconnect"
response = requests.post(url, params={"authorization": self.auth})
self.connected.clear()
self.handshake_event.clear()
return response.json()

def get_messages(self) -> List[str]:
Expand All @@ -366,11 +391,17 @@ def get_binary_messages(self) -> List[bytes]:
return [pybase64.b64decode(m) for m in response["b64_messages"]]



# Example usage:
if __name__ == "__main__":
LOG.set_level("DEBUG")
from ovos_utils.log import init_service_logger

init_service_logger("HiveMindHTTP")
LOG.set_level("ERROR")


got_tts = threading.Event()

# To handle binary data subclass BinaryDataCallbacks
class BinaryDataHandler(BinaryDataCallbacks):
def handle_receive_tts(self, bin_data: bytes,
utterance: str,
Expand All @@ -381,17 +412,42 @@ def handle_receive_tts(self, bin_data: bytes,
print(f"utterance: {utterance}", f"lang: {lang}", f"file_name: {file_name}")
# got 33836 bytes of TTS audio
# utterance: hello world lang: en-US file_name: 5eb63bbbe01eeed093cb22bb8f5acdc3.wav
got_tts.set()



# not passing key etc so it uses identity file
client = HiveMindHTTPClient(host="http://localhost", port=5679,
bin_callbacks=BinaryDataHandler())

time.sleep(5)
# send HiveMessages as usual
client.emit(HiveMessage(HiveMessageType.BUS,
MycroftMessage("speak:synth",
{"utterance": "hello world"})))
time.sleep(6)
client.emit(HiveMessage(HiveMessageType.BUS,
MycroftMessage("recognizer_loop:utterance",
{"utterances": ["who is Isaac Newton"]})))
time.sleep(20)

got_tts.wait()

# to handle agent responses, use client.on_mycroft("event", handler)
answer = None
answered = threading.Event()

def handle_speak(message: MycroftMessage):
global answer
answer = message.data['utterance']

def utt_handled(message: MycroftMessage):
answered.set()

client.on_mycroft("speak", handle_speak)
client.on_mycroft("ovos.utterance.handled", utt_handled)


while True:
utt = input("> ")
client.emit(HiveMessage(HiveMessageType.BUS,
MycroftMessage("recognizer_loop:utterance",
{"utterances": [utt]})))
answered.wait()
print(answer)
answered.clear()

Loading