diff --git a/api/config.py b/api/config.py index 5929da9..77f6187 100644 --- a/api/config.py +++ b/api/config.py @@ -1,4 +1,5 @@ import os +import re class ConfigManager: def __init__(self, file_path: str): @@ -78,3 +79,84 @@ def set(self, key, value): lines.append(new_value) with open(self.file_path, 'w') as f: f.writelines(lines) + +class ConfigUci: + def __init__(self, file_path: str): + self.file_path = file_path + if not os.path.exists(self.file_path): + raise FileNotFoundError(f"File not found: {self.file_path}") + self.main_section = None + self.data = dict() + self.read() + + def __getattr__(self, key): + if self.main_section: + return self.data[self.main_section].get(key.lower()) + return self.data.get(key) + + def _process_value(self, value): + """ Transform value from string to type """ + if value.startswith('"') and value.endswith('"'): + value = value[1:-1] + if value.lower() == 'true': + return True + if value.lower() == 'false': + return False + if value.isdigit(): + return int(value) + return value + + def read(self): + result = {} + current_section = None + + with open(self.file_path, 'r') as f: + for line in f: + line = line.strip() + if line.startswith('#') or not line: + continue + if line.startswith('config'): + current_section = line.split()[1].strip("'") + if self.main_section is None: + self.main_section = current_section + result[current_section] = {} + elif line.startswith('option') and current_section: + match = re.match(r"option\s+'?(\w+)'?\s+'(.+)'", line) + if not match: + continue + key, value = match.groups() + key = key.lower() + value = self._process_value(value.strip("'")) + result[current_section][key] = value + + self.data = result + + def to_dict(self) -> dict: + if self.main_section: + return self.data[self.main_section] + return self.data + + def get(self, key: str, section: str = ""): + if self.main_section and not section: + section = self.main_section + return self.data[section].get(key) + +# def set(self, section, key, value): +# config_dict = self.to_dict() +# if section not in config_dict: +# config_dict[section] = {} +# config_dict[section][key] = value +# self._write_config(config_dict) +# +# def _write_config(self, config_dict): +# lines = [] +# for section, options in config_dict.items(): +# lines.append(f"config '{section}'\n") +# for key, value in options.items(): +# if isinstance(value, bool): +# value = str(value).lower() +# if ' ' in value: +# value = f'"{value}"' +# lines.append(f"\toption {key} '{value}'\n") +# with open(self.file_path, 'w') as f: +# f.writelines(lines) \ No newline at end of file diff --git a/api/const.py b/api/const.py index 7d42259..864658d 100644 --- a/api/const.py +++ b/api/const.py @@ -18,6 +18,8 @@ config_listener = '/data/listener' config_tts = '/data/tts.conf' +mico_version = '/usr/share/mico/version' +board_info = '/data/etc/binfo' wakewords_porcupine = '/usr/share/porcupine/keywords' diff --git a/api/main.py b/api/main.py index d5d4e2f..f2fce57 100644 --- a/api/main.py +++ b/api/main.py @@ -3,9 +3,13 @@ import re import requests import subprocess +import signal +import json +import base64 +import time -from config import ConfigManager -from utils import get_ip_address +from config import ConfigManager, ConfigUci +from utils import get_ip_address, get_wifi_mac_address, get_bt_mac_address, get_device_id import const hostname = os.uname()[1] @@ -14,14 +18,38 @@ config = ConfigManager(const.config_listener) config_tts = ConfigManager(const.config_tts) +system_version = ConfigUci(const.mico_version) -@app.route('/') -def index(): - return app.send_static_file('index.html') +def get_hass_token() -> str: + token = config.HA_TOKEN + if not token: + return "" + try: + token_parts = token.split('.') + if len(token_parts) != 3: + raise ValueError('Invalid HA token format') + + payload = token_parts[1] + payload += '=' * (4 - len(payload) % 4) # Add padding if necessary + decoded_payload = json.loads(base64.urlsafe_b64decode(payload).decode('utf-8')) + + if decoded_payload['exp'] < time.time(): + if config.HA_REFRESH_TOKEN: + if home_assistant_refresh_token(): + token = config.HA_TOKEN + else: + raise ValueError('Failed to refresh token') + else: + raise ValueError('Token expired and no refresh token available') + return token + except Exception as e: + raise ValueError(f'Failed to get HA token: {e}') +@app.route('/') @app.route('/app.js') -def app_js(): - return app.send_static_file('app.js') +def files(): + file = 'index.html' if request.path == '/' else request.path.lstrip('/') + return app.send_static_file(file) @app.get('/config') def get_config(): @@ -43,8 +71,8 @@ def set_config(): updated = True if request.form.get('tts_language'): - config_tts.LANGUAGE = request.form.get('tts_language') - updated = True + if config_tts.set("LANGUAGE", request.form.get('tts_language')) is not True: + updated = True if updated: service_path = os.path.join(const.services_dir, 'listener') @@ -52,6 +80,28 @@ def set_config(): return redirect('/', code=302) +@app.get('/config/stt') +def get_stt_providers(): + """ Get all state entities from HA, and filter for STT ones only """ + token = get_hass_token() + url = f'{config.HA_URL}/api/states' + headers = { + 'Authorization': f'Bearer {token}', + 'Content-Type': 'application/json', + } + + req = requests.get(url, headers=headers) + req.raise_for_status() + + providers = list() + data = req.json() + for entity in data: + if entity['entity_id'].startswith('stt.'): + entry = {"entity_id": entity['entity_id'], "name": entity['attributes'].get('friendly_name', entity['entity_id'])} + providers.append(entry) + + return jsonify({'data': {'providers': providers, 'current': config.get('HA_STT_PROVIDER') or None}}) + @app.get('/config/wakewords') def get_wakewords(): """ Get the wakewords from Porcupine, remove the file name, and only get the wakeword name """ @@ -94,6 +144,21 @@ def parse_avahi_output(output): except Exception as e: return jsonify({'hostname': hostname, 'error': str(e)}), 500 +@app.get('/device/info') +def device_info(): + data = { + 'hostname': hostname, + 'ip': speaker_ip, + 'model': system_version.HARDWARE, + 'serial_number': get_device_id(), + 'wifi': get_wifi_mac_address(), + 'bluetooth': get_bt_mac_address(), + 'version': system_version.to_dict(), + } + response = jsonify({'data': data}) + response.cache_control.max_age = 3600 + return response + @app.route('/mute') @app.route('/unmute') def manage_listener(): @@ -103,6 +168,20 @@ def manage_listener(): os.system(f'{silent} {service} {action}') return "" +@app.route('/wake') +def trigger_wake(): + process_name = '/usr/bin/porcupine' + try: + result = subprocess.run(['pgrep', '-x', process_name], capture_output=True, text=True) + if result.returncode == 0: + pid = result.stdout.strip() + os.kill(int(pid), signal.SIGINT) + return "", 200 + else: + return "", 425 + except Exception as e: + return jsonify({'error': str(e)}), 500 + @app.post('/auth') def home_assistant_auth(): ha_url = request.form.get('url', '').rstrip('/') diff --git a/api/utils.py b/api/utils.py index 0a3118c..1237e6c 100644 --- a/api/utils.py +++ b/api/utils.py @@ -1,11 +1,50 @@ import socket import fcntl import struct +import subprocess +import re -def get_ip_address(ifname): +def get_ip_address(ifname) -> str: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', ifname[:15].encode('utf-8')) )[20:24]) + +def get_unify_key(key: str) -> str: + try: + with open(f'/sys/class/unifykeys/name', 'w') as f: + f.write(key) + with open(f'/sys/class/unifykeys/read', 'r') as f: + return f.read().strip() + except IOError as e: + pass + return "" + +def get_device_id() -> str: + did = get_unify_key('deviceid') + if did: + return did + return "" + +def get_wifi_mac_address() -> str: + mac = get_unify_key('mac_wifi') + if mac: + return mac.upper() + return "" + +def get_bt_mac_address() -> str: + mac = get_unify_key('mac_bt') + if mac: + return mac.upper() + + try: + result = subprocess.run('hciconfig hci0'.split(' '), capture_output=True, text=True) + if result.returncode == 0: + match = re.search(r'BD Address: ([0-9A-F:]{17})', result.stdout) + if match: + return match.group(1) + except Exception as e: + pass + return "" \ No newline at end of file