diff --git a/custom_components/magentatv/__init__.py b/custom_components/magentatv/__init__.py index 35257bd..da586bf 100644 --- a/custom_components/magentatv/__init__.py +++ b/custom_components/magentatv/__init__.py @@ -3,6 +3,7 @@ For more details about this integration, please refer to https://github.com/xyaren/magentatv """ + from __future__ import annotations from asyncio import Lock diff --git a/custom_components/magentatv/api/client.py b/custom_components/magentatv/api/client.py index 0f316e9..8e59952 100644 --- a/custom_components/magentatv/api/client.py +++ b/custom_components/magentatv/api/client.py @@ -20,7 +20,7 @@ PairingTimeoutException, ) from .notify_server import NotifyServer -from .utils import magneta_hash +from .utils import magenta_hash PAIRING_EVENT_TIMEOUT = 5 PAIRING_ATTEMPTS = 3 @@ -44,9 +44,9 @@ def __init__( self._notify_server = notify_server - self._terminal_id = magneta_hash(instance_id) + self._terminal_id = magenta_hash(instance_id) - self._user_id = magneta_hash(user_id) + self._user_id = magenta_hash(user_id) self._requester = AiohttpRequester(http_headers={"User-Agent": "Homeassistant MagentaTV Integration"}) self._verification_code = None @@ -76,7 +76,7 @@ async def _on_event(self, changes): body = changes.get("messageBody") if "X-pairingCheck:" in body: pairing_code = changes.get("messageBody").removeprefix("X-pairingCheck:") - self._verification_code = magneta_hash(pairing_code + self._terminal_id + self._user_id) + self._verification_code = magenta_hash(pairing_code + self._terminal_id + self._user_id) self._pairing_event.set() async def _register_for_events(self): diff --git a/custom_components/magentatv/api/notify_server.py b/custom_components/magentatv/api/notify_server.py index c0f9567..dfce065 100644 --- a/custom_components/magentatv/api/notify_server.py +++ b/custom_components/magentatv/api/notify_server.py @@ -108,7 +108,7 @@ async def _async_subscribe_to_service(self, target, service: str, callback: Call self._subscription_registry[sid] = (target, service, callback) for changes in self._buffer.pop(sid, []): - callback(changes) + await callback(changes) return sid @@ -136,19 +136,23 @@ async def _async_subscribe(self, target, service) -> str: if adv_port is None: adv_port = self._listen_ip_port[1] - response = await self._requester.async_http_request( - http_request=HttpRequest( - method="SUBSCRIBE", - url=url, - headers={ - "NT": "upnp:event", - "TIMEOUT": f"Second-{self._subscription_timeout}", - "HOST": f"{target[0]}:{target[1]}", - "CALLBACK": f"", - }, - body=None, + try: + response = await self._requester.async_http_request( + http_request=HttpRequest( + method="SUBSCRIBE", + url=url, + headers={ + "NT": "upnp:event", + "TIMEOUT": f"Second-{self._subscription_timeout}", + "HOST": f"{target[0]}:{target[1]}", + "CALLBACK": f"", + }, + body=None, + ) ) - ) + except UpnpConnectionTimeoutError as ex: + LOGGER.error("Failed to subscribe %s on %s at %s", service, target, url, exc_info=ex) + raise ex assert response.status_code == 200 sid = response.headers["SID"] LOGGER.debug("Subscribed %s on %s at %s", sid, service, target) @@ -156,34 +160,41 @@ async def _async_subscribe(self, target, service) -> str: @wrap_exceptions async def _async_resubscribe(self, target, service, sid) -> str: - response = await self._requester.async_http_request( - http_request=HttpRequest( - method="SUBSCRIBE", - url=f"http://{target[0]}:{target[1]}/upnp/service/{service}/Event", - headers={ - "SID": sid, - "TIMEOUT": f"Second-{self._subscription_timeout}", - }, - body=None, + try: + response = await self._requester.async_http_request( + http_request=HttpRequest( + method="SUBSCRIBE", + url=f"http://{target[0]}:{target[1]}/upnp/service/{service}/Event", + headers={ + "SID": sid, + "TIMEOUT": f"Second-{self._subscription_timeout}", + }, + body=None, + ) ) - ) - assert response.status_code == 200 - return response.headers["SID"] + assert response.status_code == 200 + return response.headers["SID"] + except UpnpConnectionTimeoutError as ex: + LOGGER.error("Failed to resubscribe %s on %s at %s", sid, service, target, exc_info=ex) + raise ex @wrap_exceptions async def _async_unsubscribe(self, target, service, sid) -> str: - response = await self._requester.async_http_request( - http_request=HttpRequest( - method="UNSUBSCRIBE", - url=f"http://{target[0]}:{target[1]}/upnp/service/{service}/Event", - headers={ - "SID": sid, - }, - body=None, + try: + response = await self._requester.async_http_request( + http_request=HttpRequest( + method="UNSUBSCRIBE", + url=f"http://{target[0]}:{target[1]}/upnp/service/{service}/Event", + headers={ + "SID": sid, + }, + body=None, + ) ) - ) - assert response.status_code in [200, 412] - LOGGER.debug("Unsubscribed %s on %s at %s", sid, service, target) + assert response.status_code in [200, 412] + LOGGER.debug("Unsubscribed %s on %s at %s", sid, service, target) + except UpnpConnectionTimeoutError as ex: + LOGGER.warning("Failed to unsubscribe %s on %s at %s", sid, service, target, exc_info=ex) async def _async_has_subscriptions(self) -> bool: async with self.subscription_lock: diff --git a/custom_components/magentatv/api/utils.py b/custom_components/magentatv/api/utils.py index 0582e10..3da4c4f 100644 --- a/custom_components/magentatv/api/utils.py +++ b/custom_components/magentatv/api/utils.py @@ -1,5 +1,5 @@ import hashlib -def magneta_hash(data: str) -> str: +def magenta_hash(data: str) -> str: return hashlib.md5(data.encode("UTF-8")).hexdigest().upper() diff --git a/custom_components/magentatv/config_flow.py b/custom_components/magentatv/config_flow.py index 49215a8..0b770be 100644 --- a/custom_components/magentatv/config_flow.py +++ b/custom_components/magentatv/config_flow.py @@ -26,6 +26,14 @@ from homeassistant.data_entry_flow import FlowResult from homeassistant.helpers import instance_id from homeassistant.helpers.aiohttp_client import async_get_clientsession +from homeassistant.helpers.service_info.ssdp import ( + ATTR_UPNP_FRIENDLY_NAME, + ATTR_UPNP_MANUFACTURER, + ATTR_UPNP_MODEL_NAME, + ATTR_UPNP_MODEL_NUMBER, + ATTR_UPNP_UDN, + SsdpServiceInfo, +) from custom_components.magentatv import async_get_notification_server from custom_components.magentatv.api.exceptions import PairingTimeoutException @@ -70,7 +78,7 @@ class MagentaTvFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): def __init__(self) -> None: """Initialize the flow.""" - self._discoveries: dict[str, ssdp.SsdpServiceInfo] = {} + self._discoveries: dict[str, SsdpServiceInfo] = {} self.friendly_name: str | None = None self.model_name: str | None = None @@ -111,7 +119,7 @@ async def async_step_user(self, user_input: FlowInput = None) -> FlowResult: return await self.async_step_manual() self._discoveries = { - discovery.upnp.get(ssdp.ATTR_UPNP_FRIENDLY_NAME) + discovery.upnp.get(ATTR_UPNP_FRIENDLY_NAME) or cast(str, urlparse(discovery.ssdp_location).hostname): discovery for discovery in discoveries } @@ -120,7 +128,7 @@ async def async_step_user(self, user_input: FlowInput = None) -> FlowResult: return self.async_show_form(step_id="user", data_schema=data_schema, last_step=False) - async def _async_get_discoveries(self) -> list[ssdp.SsdpServiceInfo]: + async def _async_get_discoveries(self) -> list[SsdpServiceInfo]: """Get list of unconfigured DLNA devices discovered by SSDP.""" # Get all compatible devices from ssdp's cache @@ -206,11 +214,11 @@ async def async_step_unignore(self, user_input: Mapping[str, Any]) -> FlowResult return self.async_abort(reason="discovery_error") def _set_from_upnp(self, device_info: Mapping[str, Any]): - self._udn = device_info.get(ssdp.ATTR_UPNP_UDN) - self.friendly_name = device_info.get(ssdp.ATTR_UPNP_FRIENDLY_NAME) - self.model_name = device_info.get(ssdp.ATTR_UPNP_MODEL_NAME) - self.model_number = device_info.get(ssdp.ATTR_UPNP_MODEL_NUMBER) - self.manufacturer = device_info.get(ssdp.ATTR_UPNP_MANUFACTURER) + self._udn = device_info.get(ATTR_UPNP_UDN) + self.friendly_name = device_info.get(ATTR_UPNP_FRIENDLY_NAME) + self.model_name = device_info.get(ATTR_UPNP_MODEL_NAME) + self.model_number = device_info.get(ATTR_UPNP_MODEL_NUMBER) + self.manufacturer = device_info.get(ATTR_UPNP_MANUFACTURER) assert self._udn is not None assert self.friendly_name is not None @@ -220,7 +228,7 @@ def _set_from_upnp(self, device_info: Mapping[str, Any]): async def _async_set_info_from_discovery( self, - discovery_info: ssdp.SsdpServiceInfo, + discovery_info: SsdpServiceInfo, raise_on_progress: bool = True, abort_if_configured: bool = True, ) -> None: @@ -256,7 +264,7 @@ async def _async_set_info_from_discovery( return await self.async_step_enter_user_id() - async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult: + async def async_step_ssdp(self, discovery_info: SsdpServiceInfo) -> FlowResult: """Handle ssdp discovery flow.""" LOGGER.debug("async_step_ssdp %s", discovery_info) diff --git a/tests/api/test_utils.py b/tests/api/test_utils.py index 40060a0..8c40997 100644 --- a/tests/api/test_utils.py +++ b/tests/api/test_utils.py @@ -1,5 +1,5 @@ -from custom_components.magentatv.api.utils import magneta_hash +from custom_components.magentatv.api.utils import magenta_hash def test_hash_function(): - assert magneta_hash("Test") == "0CBC6611F5540BD0809A388DC95A615B" + assert magenta_hash("Test") == "0CBC6611F5540BD0809A388DC95A615B"