From d8165cc2295be9511905c2de82d19799c2fba997 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Sun, 26 Jan 2025 18:03:59 +0200 Subject: [PATCH 1/7] Move message processing to its own function --- aiowebostv/webos_client.py | 101 ++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 51 deletions(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index 33f368d..7e278ad 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -77,6 +77,8 @@ def __init__( self._volume_step_delay: timedelta | None = None self._loop = asyncio.get_running_loop() self._media_state: list[dict[str, Any]] = [] + self.callback_queues: dict[int, Queue[dict[str, Any]]] = {} + self.callback_tasks: dict[int, Task] = {} async def connect(self) -> bool: """Connect to webOS TV device.""" @@ -196,11 +198,7 @@ async def connect_handler(self, res: Future) -> None: self.callbacks = {} self.futures = {} - handler_tasks.add( - asyncio.create_task( - self.consumer_handler(main_ws, self.callbacks, self.futures) - ) - ) + handler_tasks.add(asyncio.create_task(self.consumer_handler(main_ws))) self.connection = main_ws # open additional connection needed to send button commands @@ -211,7 +209,7 @@ async def connect_handler(self, res: Future) -> None: inputsockpath = sockres["socketPath"] input_ws = await self._ws_connect(inputsockpath) handler_tasks.add( - asyncio.create_task(self.consumer_handler(input_ws, None, None)) + asyncio.create_task(self.input_consumer_handler(input_ws)) ) self.input_connection = input_ws @@ -258,6 +256,10 @@ async def connect_handler(self, res: Future) -> None: if not res.done(): res.set_exception(ex) finally: + for callback_task in self.callback_tasks.values(): + if not callback_task.done(): + callback_task.cancel() + for task in handler_tasks: if not task.done(): task.cancel() @@ -266,6 +268,11 @@ async def connect_handler(self, res: Future) -> None: future.cancel() closeout = set() + + callback_tasks = set(self.callback_tasks.values()) + if callback_tasks: + closeout.update(callback_tasks) + closeout.update(handler_tasks) if main_ws is not None: @@ -294,6 +301,8 @@ async def connect_handler(self, res: Future) -> None: self._hello_info = {} self._sound_output = None self._media_state = [] + self.callback_queues = {} + self.callback_tasks = {} for callback in self.state_update_callbacks: closeout.add(asyncio.create_task(callback(self))) @@ -320,51 +329,45 @@ async def callback_handler( if future is not None and not future.done(): future.set_result(msg) - async def consumer_handler( - self, - web_socket: ClientWebSocketResponse, - callbacks: dict[int, Callable] | None, - futures: dict[int, Future] | None, - ) -> None: - """Callbacks consumer handler.""" - callback_queues: dict[int, Queue[dict[str, Any]]] = {} - callback_tasks: dict[int, Task] = {} + async def _process_text_message(self, data: str) -> None: + """Process text message.""" + if not self.callbacks and not self.futures: + return - try: - async for raw_msg in web_socket: - _LOGGER.debug("recv(%s): %s", self.host, raw_msg) - if raw_msg.type is not WSMsgType.TEXT: - break - - if callbacks or futures: - msg = json.loads(raw_msg.data) - uid = msg.get("id") - callback = self.callbacks.get(uid) - future = self.futures.get(uid) - if callback is not None: - if uid not in callback_tasks: - queue: Queue[dict[str, Any]] = asyncio.Queue() - callback_queues[uid] = queue - callback_tasks[uid] = asyncio.create_task( - self.callback_handler(queue, callback, future) - ) - callback_queues[uid].put_nowait(msg) - elif future is not None and not future.done(): - self.futures[uid].set_result(msg) + msg = json.loads(data) + uid = msg.get("id") + callback = self.callbacks.get(uid) + future = self.futures.get(uid) + if callback is not None: + if uid not in self.callback_tasks: + queue: Queue[dict[str, Any]] = asyncio.Queue() + self.callback_queues[uid] = queue + self.callback_tasks[uid] = asyncio.create_task( + self.callback_handler(queue, callback, future) + ) + self.callback_queues[uid].put_nowait(msg) + elif future is not None and not future.done(): + self.futures[uid].set_result(msg) - finally: - for task in callback_tasks.values(): - if not task.done(): - task.cancel() + async def consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: + """Callbacks consumer handler.""" + async for raw_msg in web_socket: + _LOGGER.debug("recv(%s): %s", self.host, raw_msg) + if raw_msg.type is not WSMsgType.TEXT: + break - tasks = set(callback_tasks.values()) + await self._process_text_message(raw_msg.data) - if tasks: - closeout_task = asyncio.create_task(asyncio.wait(tasks)) + async def input_consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: + """Input consumer handler. - while not closeout_task.done(): - with suppress(asyncio.CancelledError): - await asyncio.shield(closeout_task) + We are not expecting any messages from the input connection. + This is just to keep the connection alive. + """ + async for raw_msg in web_socket: + _LOGGER.debug("input recv(%s): %s", self.host, raw_msg) + if raw_msg.type is not WSMsgType.TEXT: + break # manage state @property @@ -480,11 +483,7 @@ def clear_state_update_callbacks(self) -> None: async def do_state_update_callbacks(self) -> None: """Call user state update callback.""" - callbacks = set() - for callback in self.state_update_callbacks: - callbacks.add(callback(self)) - - if callbacks: + if callbacks := {callback(self) for callback in self.state_update_callbacks}: await asyncio.gather(*callbacks) async def set_power_state(self, payload: dict[str, bool | str]) -> None: From c6500b6495365daeb50f6346fafaf6eb23c9eb12 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Sun, 26 Jan 2025 20:22:16 +0200 Subject: [PATCH 2/7] pre-create the queue --- aiowebostv/webos_client.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index 7e278ad..867f74d 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -336,17 +336,9 @@ async def _process_text_message(self, data: str) -> None: msg = json.loads(data) uid = msg.get("id") - callback = self.callbacks.get(uid) - future = self.futures.get(uid) - if callback is not None: - if uid not in self.callback_tasks: - queue: Queue[dict[str, Any]] = asyncio.Queue() - self.callback_queues[uid] = queue - self.callback_tasks[uid] = asyncio.create_task( - self.callback_handler(queue, callback, future) - ) - self.callback_queues[uid].put_nowait(msg) - elif future is not None and not future.done(): + if queue := self.callback_queues.get(uid): + queue.put_nowait(msg) + elif self.futures[uid] is not None: self.futures[uid].set_result(msg) async def consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: @@ -689,12 +681,24 @@ async def subscribe( uid = self.command_count self.command_count += 1 self.callbacks[uid] = callback + queue: Queue[dict[str, Any]] = asyncio.Queue() + self.callback_queues[uid] = queue + self.callback_tasks[uid] = asyncio.create_task( + self.callback_handler(queue, callback, self.futures.get(uid)) + ) try: return await self.request( uri, payload=payload, cmd_type="subscribe", uid=uid ) except Exception: del self.callbacks[uid] + task = self.callback_tasks.pop(uid) + if not task.done(): + task.cancel() + while not task.done(): + with suppress(asyncio.CancelledError): + await asyncio.shield(task) + del self.callback_queues[uid] raise async def input_command(self, message: str) -> None: From 41298740fb3167d26016018fc8881c58e0200e62 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Sun, 26 Jan 2025 22:19:19 +0200 Subject: [PATCH 3/7] pre-create a future --- aiowebostv/webos_client.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index 867f74d..430bfb6 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -318,7 +318,7 @@ async def connect_handler(self, res: Future) -> None: async def callback_handler( queue: Queue[dict[str, Any]], callback: Callable, - future: Future[dict[str, Any]] | None, + future: Future[dict[str, Any]], ) -> None: """Handle callbacks.""" with suppress(asyncio.CancelledError): @@ -326,7 +326,7 @@ async def callback_handler( msg = await queue.get() payload = msg.get("payload") await callback(payload) - if future is not None and not future.done(): + if not future.done(): future.set_result(msg) async def _process_text_message(self, data: str) -> None: @@ -336,10 +336,12 @@ async def _process_text_message(self, data: str) -> None: msg = json.loads(data) uid = msg.get("id") + # if we have a callback for this message, put it in the queue + # let the callback handle the message and mark the future as done if queue := self.callback_queues.get(uid): queue.put_nowait(msg) - elif self.futures[uid] is not None: - self.futures[uid].set_result(msg) + elif future := self.futures.get(uid): + future.set_result(msg) async def consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: """Callbacks consumer handler.""" @@ -634,8 +636,10 @@ async def request( if uid is None: uid = self.command_count self.command_count += 1 - res = self._loop.create_future() - self.futures[uid] = res + res = self._loop.create_future() + self.futures[uid] = res + else: + res = self.futures[uid] try: await self.command(cmd_type, uri, payload, uid) except (asyncio.CancelledError, WebOsTvCommandError): @@ -677,14 +681,19 @@ async def request( async def subscribe( self, callback: Callable, uri: str, payload: dict[str, Any] | None = None ) -> dict[str, Any]: - """Subscribe to updates.""" + """Subscribe to updates. + + Subsciption use a fixed uid so we need to pre-create a future, + Create a queue to store the messages and a task to handle the messages. + """ uid = self.command_count self.command_count += 1 + self.futures[uid] = future = self._loop.create_future() self.callbacks[uid] = callback queue: Queue[dict[str, Any]] = asyncio.Queue() self.callback_queues[uid] = queue self.callback_tasks[uid] = asyncio.create_task( - self.callback_handler(queue, callback, self.futures.get(uid)) + self.callback_handler(queue, callback, future) ) try: return await self.request( From d19372f20f86cf34b5844b305262d866b3fd9c68 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Sun, 26 Jan 2025 23:08:58 +0200 Subject: [PATCH 4/7] Move blocks to named function --- aiowebostv/webos_client.py | 44 ++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index 430bfb6..d4baf25 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -678,16 +678,12 @@ async def request( return payload - async def subscribe( - self, callback: Callable, uri: str, payload: dict[str, Any] | None = None - ) -> dict[str, Any]: - """Subscribe to updates. + async def create_subscription_handler(self, uid: int, callback: Callable) -> None: + """Create a subscription handler for a given uid. - Subsciption use a fixed uid so we need to pre-create a future, - Create a queue to store the messages and a task to handle the messages. + Create a queue to store the messages, a task to handle the messages + and a future to signal first subscription update processed. """ - uid = self.command_count - self.command_count += 1 self.futures[uid] = future = self._loop.create_future() self.callbacks[uid] = callback queue: Queue[dict[str, Any]] = asyncio.Queue() @@ -695,19 +691,35 @@ async def subscribe( self.callback_tasks[uid] = asyncio.create_task( self.callback_handler(queue, callback, future) ) + + async def delete_subscription_handler(self, uid: int) -> None: + """Delete a subscription handler for a given uid.""" + del self.callbacks[uid] + task = self.callback_tasks.pop(uid) + if not task.done(): + task.cancel() + while not task.done(): + with suppress(asyncio.CancelledError): + await asyncio.shield(task) + del self.callback_queues[uid] + + async def subscribe( + self, callback: Callable, uri: str, payload: dict[str, Any] | None = None + ) -> dict[str, Any]: + """Subscribe to updates. + + Subsciption use a fixed uid, pre-create a future and a handler. + """ + uid = self.command_count + self.command_count += 1 + await self.create_subscription_handler(uid, callback) + try: return await self.request( uri, payload=payload, cmd_type="subscribe", uid=uid ) except Exception: - del self.callbacks[uid] - task = self.callback_tasks.pop(uid) - if not task.done(): - task.cancel() - while not task.done(): - with suppress(asyncio.CancelledError): - await asyncio.shield(task) - del self.callback_queues[uid] + await self.delete_subscription_handler(uid) raise async def input_command(self, message: str) -> None: From c82547d15cebe25a7bf8cdc0d65d5c37c9d20cf7 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Mon, 27 Jan 2025 00:34:50 +0200 Subject: [PATCH 5/7] do not store subsciption callbacks --- aiowebostv/webos_client.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index d4baf25..12a9e57 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -56,7 +56,6 @@ def __init__( self.connect_result: Future[bool] | None = None self.connection: ClientWebSocketResponse | None = None self.input_connection: ClientWebSocketResponse | None = None - self.callbacks: dict[int, Callable] = {} self.futures: dict[int, Future[dict[str, Any]]] = {} self._power_state: dict[str, Any] = {} self._current_app_id: str | None = None @@ -195,7 +194,8 @@ async def connect_handler(self, res: Future) -> None: error = "Client key not set, pairing failed." raise WebOsTvPairError(error) - self.callbacks = {} + self.callback_queues = {} + self.callback_tasks = {} self.futures = {} handler_tasks.add(asyncio.create_task(self.consumer_handler(main_ws))) @@ -301,8 +301,6 @@ async def connect_handler(self, res: Future) -> None: self._hello_info = {} self._sound_output = None self._media_state = [] - self.callback_queues = {} - self.callback_tasks = {} for callback in self.state_update_callbacks: closeout.add(asyncio.create_task(callback(self))) @@ -329,11 +327,8 @@ async def callback_handler( if not future.done(): future.set_result(msg) - async def _process_text_message(self, data: str) -> None: + def _process_text_message(self, data: str) -> None: """Process text message.""" - if not self.callbacks and not self.futures: - return - msg = json.loads(data) uid = msg.get("id") # if we have a callback for this message, put it in the queue @@ -350,7 +345,7 @@ async def consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: if raw_msg.type is not WSMsgType.TEXT: break - await self._process_text_message(raw_msg.data) + self._process_text_message(raw_msg.data) async def input_consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: """Input consumer handler. @@ -685,7 +680,6 @@ async def create_subscription_handler(self, uid: int, callback: Callable) -> Non and a future to signal first subscription update processed. """ self.futures[uid] = future = self._loop.create_future() - self.callbacks[uid] = callback queue: Queue[dict[str, Any]] = asyncio.Queue() self.callback_queues[uid] = queue self.callback_tasks[uid] = asyncio.create_task( @@ -694,7 +688,6 @@ async def create_subscription_handler(self, uid: int, callback: Callable) -> Non async def delete_subscription_handler(self, uid: int) -> None: """Delete a subscription handler for a given uid.""" - del self.callbacks[uid] task = self.callback_tasks.pop(uid) if not task.done(): task.cancel() From 15715e85f03ef9ccdbaf37de7005ebc1705f6874 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Mon, 27 Jan 2025 22:12:21 +0200 Subject: [PATCH 6/7] Refactor connect_handler --- aiowebostv/webos_client.py | 352 ++++++++++++++++++++----------------- ruff.toml | 5 - 2 files changed, 191 insertions(+), 166 deletions(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index 12a9e57..dbfc3d2 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -78,6 +78,7 @@ def __init__( self._media_state: list[dict[str, Any]] = [] self.callback_queues: dict[int, Queue[dict[str, Any]]] = {} self.callback_tasks: dict[int, Task] = {} + self._rx_tasks: set[Task] = set() async def connect(self) -> bool: """Connect to webOS TV device.""" @@ -134,183 +135,212 @@ async def close_client_session(self) -> None: self.created_client_session = False self.client_session = None - async def connect_handler(self, res: Future) -> None: - """Handle connection for webOS TV.""" - handler_tasks: set[Task] = set() - main_ws: ClientWebSocketResponse | None = None - input_ws: ClientWebSocketResponse | None = None + async def _create_main_ws(self) -> ClientWebSocketResponse: + """Create main websocket connection. + Try using ws:// and fallback to wss:// if the TV rejects the connection. + """ try: - # Create a new client session if not provided - if self.client_session is None: - self.client_session = ClientSession() - self.created_client_session = True - - try: - uri = f"ws://{self.host}:{WS_PORT}" - main_ws = await self._ws_connect(uri) - # ClientConnectionError is raised when firmware reject WS_PORT - # WSServerHandshakeError is raised when firmware enforce using ssl - except (aiohttp.ClientConnectionError, aiohttp.WSServerHandshakeError): - uri = f"wss://{self.host}:{WSS_PORT}" - main_ws = await self._ws_connect(uri) - - # send hello - _LOGGER.debug("send(%s): hello", self.host) - await main_ws.send_json({"id": "hello", "type": "hello"}) - response = await main_ws.receive_json() - _LOGGER.debug("recv(%s): %s", self.host, response) - - if response["type"] == "hello": - self._hello_info = response["payload"] - else: - error = f"Invalid response type {response}" - raise WebOsTvCommandError(error) - - # send registration - _LOGGER.debug("send(%s): registration", self.host) - await main_ws.send_json(self.registration_msg()) - response = await main_ws.receive_json() - _LOGGER.debug("recv(%s): registration", self.host) - - if ( - response["type"] == "response" - and response["payload"]["pairingType"] == "PROMPT" - ): - response = await main_ws.receive_json() - _LOGGER.debug("recv(%s): pairing", self.host) - _LOGGER.debug( - "pairing(%s): type: %s, error: %s", - self.host, - response["type"], - response.get("error"), - ) - if response["type"] == "error": - raise WebOsTvPairError(response["error"]) - if response["type"] == "registered": - self.client_key = response["payload"]["client-key"] - - if not self.client_key: - error = "Client key not set, pairing failed." - raise WebOsTvPairError(error) - - self.callback_queues = {} - self.callback_tasks = {} - self.futures = {} - - handler_tasks.add(asyncio.create_task(self.consumer_handler(main_ws))) - self.connection = main_ws - - # open additional connection needed to send button commands - # the url is dynamically generated and returned from the ep.INPUT_SOCKET - # endpoint on the main connection - # create an empty consumer handler to keep ping/pong alive - sockres = await self.request(ep.INPUT_SOCKET) - inputsockpath = sockres["socketPath"] - input_ws = await self._ws_connect(inputsockpath) - handler_tasks.add( - asyncio.create_task(self.input_consumer_handler(input_ws)) - ) - self.input_connection = input_ws + uri = f"ws://{self.host}:{WS_PORT}" + return await self._ws_connect(uri) + # ClientConnectionError is raised when firmware reject WS_PORT + # WSServerHandshakeError is raised when firmware enforce using ssl + except (aiohttp.ClientConnectionError, aiohttp.WSServerHandshakeError): + uri = f"wss://{self.host}:{WSS_PORT}" + return await self._ws_connect(uri) + + def _ensure_client_session(self) -> None: + """Create a new client session if no client session provided.""" + if self.client_session is None: + self.client_session = ClientSession() + self.created_client_session = True + + async def _get_hello_info(self, ws: ClientWebSocketResponse) -> None: + """Get hello info.""" + _LOGGER.debug("send(%s): hello", self.host) + await ws.send_json({"id": "hello", "type": "hello"}) + response = await ws.receive_json() + _LOGGER.debug("recv(%s): %s", self.host, response) + + if response["type"] == "hello": + self._hello_info = response["payload"] + else: + error = f"Invalid response type {response}" + raise WebOsTvCommandError(error) - # set static state and subscribe to state updates - # avoid partial updates during initial subscription + async def _check_registration(self, ws: ClientWebSocketResponse) -> None: + """Check if the client is registered with the tv.""" + _LOGGER.debug("send(%s): registration", self.host) + await ws.send_json(self.registration_msg()) + response = await ws.receive_json() + _LOGGER.debug("recv(%s): registration", self.host) - self.do_state_update = False - self._system_info, self._software_info = await asyncio.gather( - self.get_system_info(), self.get_software_info() + if ( + response["type"] == "response" + and response["payload"]["pairingType"] == "PROMPT" + ): + response = await ws.receive_json() + _LOGGER.debug("recv(%s): pairing", self.host) + _LOGGER.debug( + "pairing(%s): type: %s, error: %s", + self.host, + response["type"], + response.get("error"), ) - subscribe_state_updates = { - self.subscribe_power_state(self.set_power_state), - self.subscribe_current_app(self.set_current_app_state), - self.subscribe_muted(self.set_muted_state), - self.subscribe_volume(self.set_volume_state), - self.subscribe_apps(self.set_apps_state), - self.subscribe_inputs(self.set_inputs_state), - self.subscribe_sound_output(self.set_sound_output_state), - self.subscribe_media_foreground_app(self.set_media_state), - } - subscribe_tasks = set() - for state_update in subscribe_state_updates: - subscribe_tasks.add(asyncio.create_task(state_update)) - await asyncio.wait(subscribe_tasks) - for task in subscribe_tasks: - with suppress(WebOsTvServiceNotFoundError): - task.result() - # set placeholder power state if not available - if not self._power_state: - self._power_state = {"state": "Unknown"} - self.do_state_update = True - if self.state_update_callbacks: - await self.do_state_update_callbacks() + if response["type"] == "error": + raise WebOsTvPairError(response["error"]) + if response["type"] == "registered": + self.client_key = response["payload"]["client-key"] - res.set_result(True) + if not self.client_key: + error = "Client key not set, pairing failed." + raise WebOsTvPairError(error) - await asyncio.wait(handler_tasks, return_when=asyncio.FIRST_COMPLETED) + async def _create_input_ws(self) -> ClientWebSocketResponse: + """Create input websocket connection. - except Exception as ex: # pylint: disable=broad-except - if isinstance(ex, TimeoutError): - _LOGGER.debug("timeout(%s): connection", self.host) - else: - _LOGGER.debug("exception(%s): %r", self.host, ex, exc_info=True) - if not res.done(): - res.set_exception(ex) - finally: - for callback_task in self.callback_tasks.values(): - if not callback_task.done(): - callback_task.cancel() + Open additional connection needed to send button commands + The url is dynamically generated and returned from the ep.INPUT_SOCKET + endpoint on the main connection. + """ + sockres = await self.request(ep.INPUT_SOCKET) + inputsockpath = sockres["socketPath"] + return await self._ws_connect(inputsockpath) - for task in handler_tasks: - if not task.done(): - task.cancel() + async def _get_states_and_subscribe_state_updates(self) -> None: + """Get initial states and subscribe to state updates. - for future in self.futures.values(): - future.cancel() + Avoid partial updates during initial subscription. + """ + self.do_state_update = False + self._system_info, self._software_info = await asyncio.gather( + self.get_system_info(), self.get_software_info() + ) + subscribe_state_updates = { + self.subscribe_power_state(self.set_power_state), + self.subscribe_current_app(self.set_current_app_state), + self.subscribe_muted(self.set_muted_state), + self.subscribe_volume(self.set_volume_state), + self.subscribe_apps(self.set_apps_state), + self.subscribe_inputs(self.set_inputs_state), + self.subscribe_sound_output(self.set_sound_output_state), + self.subscribe_media_foreground_app(self.set_media_state), + } + subscribe_tasks = set() + for state_update in subscribe_state_updates: + subscribe_tasks.add(asyncio.create_task(state_update)) + await asyncio.wait(subscribe_tasks) + for task in subscribe_tasks: + with suppress(WebOsTvServiceNotFoundError): + task.result() + # set placeholder power state if not available + if not self._power_state: + self._power_state = {"state": "Unknown"} + self.do_state_update = True + if self.state_update_callbacks: + await self.do_state_update_callbacks() - closeout = set() + def _clear_tv_states(self) -> None: + """Clear all TV states.""" + self._power_state = {} + self._current_app_id = None + self._muted = None + self._volume = None + self._current_channel = None + self._channel_info = None + self._channels = None + self._apps = {} + self._extinputs = {} + self._system_info = {} + self._software_info = {} + self._hello_info = {} + self._sound_output = None + self._media_state = [] + + def _cancel_tasks(self) -> None: + """Cancel all tasks.""" + for callback_task in self.callback_tasks.values(): + if not callback_task.done(): + callback_task.cancel() + + for task in self._rx_tasks: + if not task.done(): + task.cancel() + + for future in self.futures.values(): + future.cancel() + + async def _closeout_tasks( + self, + main_ws: ClientWebSocketResponse | None, + input_ws: ClientWebSocketResponse | None, + ) -> None: + """Cancel all tasks and close connections.""" + closeout = set() - callback_tasks = set(self.callback_tasks.values()) - if callback_tasks: - closeout.update(callback_tasks) + self._cancel_tasks() - closeout.update(handler_tasks) + if callback_tasks := set(self.callback_tasks.values()): + closeout.update(callback_tasks) - if main_ws is not None: - closeout.add(asyncio.create_task(main_ws.close())) - if input_ws is not None: - closeout.add(asyncio.create_task(input_ws.close())) - if self.created_client_session: - closeout.add(asyncio.create_task(self.close_client_session())) + closeout.update(self._rx_tasks) - self.connection = None - self.input_connection = None + if main_ws is not None: + closeout.add(asyncio.create_task(main_ws.close())) + if input_ws is not None: + closeout.add(asyncio.create_task(input_ws.close())) + if self.created_client_session: + closeout.add(asyncio.create_task(self.close_client_session())) - self.do_state_update = False + self.connection = None + self.input_connection = None + self.do_state_update = False + self._clear_tv_states() - self._power_state = {} - self._current_app_id = None - self._muted = None - self._volume = None - self._current_channel = None - self._channel_info = None - self._channels = None - self._apps = {} - self._extinputs = {} - self._system_info = {} - self._software_info = {} - self._hello_info = {} - self._sound_output = None - self._media_state = [] + for callback in self.state_update_callbacks: + closeout.add(asyncio.create_task(callback(self))) - for callback in self.state_update_callbacks: - closeout.add(asyncio.create_task(callback(self))) + if not closeout: + return - if closeout: - closeout_task = asyncio.create_task(asyncio.wait(closeout)) + closeout_task = asyncio.create_task(asyncio.wait(closeout)) + while not closeout_task.done(): + with suppress(asyncio.CancelledError): + await asyncio.shield(closeout_task) - while not closeout_task.done(): - with suppress(asyncio.CancelledError): - await asyncio.shield(closeout_task) + async def connect_handler(self, res: Future) -> None: + """Handle connection for webOS TV.""" + self._rx_tasks = set() + self.callback_queues = {} + self.callback_tasks = {} + self.futures = {} + main_ws: ClientWebSocketResponse | None = None + input_ws: ClientWebSocketResponse | None = None + self._ensure_client_session() + try: + main_ws = await self._create_main_ws() + await self._get_hello_info(main_ws) + await self._check_registration(main_ws) + self._rx_tasks.add(asyncio.create_task(self._rx_msgs_main_ws(main_ws))) + self.connection = main_ws + input_ws = await self._create_input_ws() + self._rx_tasks.add(asyncio.create_task(self._rx_msgs_input_ws(input_ws))) + self.input_connection = input_ws + await self._get_states_and_subscribe_state_updates() + res.set_result(True) + await asyncio.wait(self._rx_tasks, return_when=asyncio.FIRST_COMPLETED) + except TimeoutError: + _LOGGER.debug("timeout(%s): connection", self.host) + if not res.done(): + res.set_exception(asyncio.TimeoutError) + except Exception as ex: + _LOGGER.debug("exception(%s): %r", self.host, ex, exc_info=True) + if not res.done(): + res.set_exception(ex) + else: + raise + finally: + await self._closeout_tasks(main_ws, input_ws) @staticmethod async def callback_handler( @@ -338,8 +368,8 @@ def _process_text_message(self, data: str) -> None: elif future := self.futures.get(uid): future.set_result(msg) - async def consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: - """Callbacks consumer handler.""" + async def _rx_msgs_main_ws(self, web_socket: ClientWebSocketResponse) -> None: + """Receive messages from main websocket connection.""" async for raw_msg in web_socket: _LOGGER.debug("recv(%s): %s", self.host, raw_msg) if raw_msg.type is not WSMsgType.TEXT: @@ -347,8 +377,8 @@ async def consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: self._process_text_message(raw_msg.data) - async def input_consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: - """Input consumer handler. + async def _rx_msgs_input_ws(self, web_socket: ClientWebSocketResponse) -> None: + """Receive messages from input websocket connection. We are not expecting any messages from the input connection. This is just to keep the connection alive. diff --git a/ruff.toml b/ruff.toml index e50ba94..d047628 100644 --- a/ruff.toml +++ b/ruff.toml @@ -3,14 +3,9 @@ target-version = "py311" lint.select = ["ALL"] lint.ignore = [ - "BLE001", # Do not catch blind exception: `Exception` - "C901", # is too complex "COM812", # Trailing comma missing (conflicts with formatter) "D203", # 1 blank line required before class docstring (conflicts with `no-blank-line-before-class` (D211)) "D213", # Multi-line docstring summary should start at the second line (conflicts with multi-line-summary-first-line` (D212)) - "PLR0912", # Too many branches - "PLR0915", # Too many statements - "TRY301", # Abstract `raise` to an inner function ] From 509e29966861ffe40bf38d98853a866555d70785 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Wed, 29 Jan 2025 00:40:00 +0200 Subject: [PATCH 7/7] Remove shield from task --- aiowebostv/webos_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index dbfc3d2..acd7454 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -723,7 +723,7 @@ async def delete_subscription_handler(self, uid: int) -> None: task.cancel() while not task.done(): with suppress(asyncio.CancelledError): - await asyncio.shield(task) + await task del self.callback_queues[uid] async def subscribe(