From d8165cc2295be9511905c2de82d19799c2fba997 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Sun, 26 Jan 2025 18:03:59 +0200 Subject: [PATCH 1/5] Move message processing to its own function --- aiowebostv/webos_client.py | 101 ++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 51 deletions(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index 33f368d..7e278ad 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -77,6 +77,8 @@ def __init__( self._volume_step_delay: timedelta | None = None self._loop = asyncio.get_running_loop() self._media_state: list[dict[str, Any]] = [] + self.callback_queues: dict[int, Queue[dict[str, Any]]] = {} + self.callback_tasks: dict[int, Task] = {} async def connect(self) -> bool: """Connect to webOS TV device.""" @@ -196,11 +198,7 @@ async def connect_handler(self, res: Future) -> None: self.callbacks = {} self.futures = {} - handler_tasks.add( - asyncio.create_task( - self.consumer_handler(main_ws, self.callbacks, self.futures) - ) - ) + handler_tasks.add(asyncio.create_task(self.consumer_handler(main_ws))) self.connection = main_ws # open additional connection needed to send button commands @@ -211,7 +209,7 @@ async def connect_handler(self, res: Future) -> None: inputsockpath = sockres["socketPath"] input_ws = await self._ws_connect(inputsockpath) handler_tasks.add( - asyncio.create_task(self.consumer_handler(input_ws, None, None)) + asyncio.create_task(self.input_consumer_handler(input_ws)) ) self.input_connection = input_ws @@ -258,6 +256,10 @@ async def connect_handler(self, res: Future) -> None: if not res.done(): res.set_exception(ex) finally: + for callback_task in self.callback_tasks.values(): + if not callback_task.done(): + callback_task.cancel() + for task in handler_tasks: if not task.done(): task.cancel() @@ -266,6 +268,11 @@ async def connect_handler(self, res: Future) -> None: future.cancel() closeout = set() + + callback_tasks = set(self.callback_tasks.values()) + if callback_tasks: + closeout.update(callback_tasks) + closeout.update(handler_tasks) if main_ws is not None: @@ -294,6 +301,8 @@ async def connect_handler(self, res: Future) -> None: self._hello_info = {} self._sound_output = None self._media_state = [] + self.callback_queues = {} + self.callback_tasks = {} for callback in self.state_update_callbacks: closeout.add(asyncio.create_task(callback(self))) @@ -320,51 +329,45 @@ async def callback_handler( if future is not None and not future.done(): future.set_result(msg) - async def consumer_handler( - self, - web_socket: ClientWebSocketResponse, - callbacks: dict[int, Callable] | None, - futures: dict[int, Future] | None, - ) -> None: - """Callbacks consumer handler.""" - callback_queues: dict[int, Queue[dict[str, Any]]] = {} - callback_tasks: dict[int, Task] = {} + async def _process_text_message(self, data: str) -> None: + """Process text message.""" + if not self.callbacks and not self.futures: + return - try: - async for raw_msg in web_socket: - _LOGGER.debug("recv(%s): %s", self.host, raw_msg) - if raw_msg.type is not WSMsgType.TEXT: - break - - if callbacks or futures: - msg = json.loads(raw_msg.data) - uid = msg.get("id") - callback = self.callbacks.get(uid) - future = self.futures.get(uid) - if callback is not None: - if uid not in callback_tasks: - queue: Queue[dict[str, Any]] = asyncio.Queue() - callback_queues[uid] = queue - callback_tasks[uid] = asyncio.create_task( - self.callback_handler(queue, callback, future) - ) - callback_queues[uid].put_nowait(msg) - elif future is not None and not future.done(): - self.futures[uid].set_result(msg) + msg = json.loads(data) + uid = msg.get("id") + callback = self.callbacks.get(uid) + future = self.futures.get(uid) + if callback is not None: + if uid not in self.callback_tasks: + queue: Queue[dict[str, Any]] = asyncio.Queue() + self.callback_queues[uid] = queue + self.callback_tasks[uid] = asyncio.create_task( + self.callback_handler(queue, callback, future) + ) + self.callback_queues[uid].put_nowait(msg) + elif future is not None and not future.done(): + self.futures[uid].set_result(msg) - finally: - for task in callback_tasks.values(): - if not task.done(): - task.cancel() + async def consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: + """Callbacks consumer handler.""" + async for raw_msg in web_socket: + _LOGGER.debug("recv(%s): %s", self.host, raw_msg) + if raw_msg.type is not WSMsgType.TEXT: + break - tasks = set(callback_tasks.values()) + await self._process_text_message(raw_msg.data) - if tasks: - closeout_task = asyncio.create_task(asyncio.wait(tasks)) + async def input_consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: + """Input consumer handler. - while not closeout_task.done(): - with suppress(asyncio.CancelledError): - await asyncio.shield(closeout_task) + We are not expecting any messages from the input connection. + This is just to keep the connection alive. + """ + async for raw_msg in web_socket: + _LOGGER.debug("input recv(%s): %s", self.host, raw_msg) + if raw_msg.type is not WSMsgType.TEXT: + break # manage state @property @@ -480,11 +483,7 @@ def clear_state_update_callbacks(self) -> None: async def do_state_update_callbacks(self) -> None: """Call user state update callback.""" - callbacks = set() - for callback in self.state_update_callbacks: - callbacks.add(callback(self)) - - if callbacks: + if callbacks := {callback(self) for callback in self.state_update_callbacks}: await asyncio.gather(*callbacks) async def set_power_state(self, payload: dict[str, bool | str]) -> None: From c6500b6495365daeb50f6346fafaf6eb23c9eb12 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Sun, 26 Jan 2025 20:22:16 +0200 Subject: [PATCH 2/5] pre-create the queue --- aiowebostv/webos_client.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index 7e278ad..867f74d 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -336,17 +336,9 @@ async def _process_text_message(self, data: str) -> None: msg = json.loads(data) uid = msg.get("id") - callback = self.callbacks.get(uid) - future = self.futures.get(uid) - if callback is not None: - if uid not in self.callback_tasks: - queue: Queue[dict[str, Any]] = asyncio.Queue() - self.callback_queues[uid] = queue - self.callback_tasks[uid] = asyncio.create_task( - self.callback_handler(queue, callback, future) - ) - self.callback_queues[uid].put_nowait(msg) - elif future is not None and not future.done(): + if queue := self.callback_queues.get(uid): + queue.put_nowait(msg) + elif self.futures[uid] is not None: self.futures[uid].set_result(msg) async def consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: @@ -689,12 +681,24 @@ async def subscribe( uid = self.command_count self.command_count += 1 self.callbacks[uid] = callback + queue: Queue[dict[str, Any]] = asyncio.Queue() + self.callback_queues[uid] = queue + self.callback_tasks[uid] = asyncio.create_task( + self.callback_handler(queue, callback, self.futures.get(uid)) + ) try: return await self.request( uri, payload=payload, cmd_type="subscribe", uid=uid ) except Exception: del self.callbacks[uid] + task = self.callback_tasks.pop(uid) + if not task.done(): + task.cancel() + while not task.done(): + with suppress(asyncio.CancelledError): + await asyncio.shield(task) + del self.callback_queues[uid] raise async def input_command(self, message: str) -> None: From 41298740fb3167d26016018fc8881c58e0200e62 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Sun, 26 Jan 2025 22:19:19 +0200 Subject: [PATCH 3/5] pre-create a future --- aiowebostv/webos_client.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index 867f74d..430bfb6 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -318,7 +318,7 @@ async def connect_handler(self, res: Future) -> None: async def callback_handler( queue: Queue[dict[str, Any]], callback: Callable, - future: Future[dict[str, Any]] | None, + future: Future[dict[str, Any]], ) -> None: """Handle callbacks.""" with suppress(asyncio.CancelledError): @@ -326,7 +326,7 @@ async def callback_handler( msg = await queue.get() payload = msg.get("payload") await callback(payload) - if future is not None and not future.done(): + if not future.done(): future.set_result(msg) async def _process_text_message(self, data: str) -> None: @@ -336,10 +336,12 @@ async def _process_text_message(self, data: str) -> None: msg = json.loads(data) uid = msg.get("id") + # if we have a callback for this message, put it in the queue + # let the callback handle the message and mark the future as done if queue := self.callback_queues.get(uid): queue.put_nowait(msg) - elif self.futures[uid] is not None: - self.futures[uid].set_result(msg) + elif future := self.futures.get(uid): + future.set_result(msg) async def consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: """Callbacks consumer handler.""" @@ -634,8 +636,10 @@ async def request( if uid is None: uid = self.command_count self.command_count += 1 - res = self._loop.create_future() - self.futures[uid] = res + res = self._loop.create_future() + self.futures[uid] = res + else: + res = self.futures[uid] try: await self.command(cmd_type, uri, payload, uid) except (asyncio.CancelledError, WebOsTvCommandError): @@ -677,14 +681,19 @@ async def request( async def subscribe( self, callback: Callable, uri: str, payload: dict[str, Any] | None = None ) -> dict[str, Any]: - """Subscribe to updates.""" + """Subscribe to updates. + + Subsciption use a fixed uid so we need to pre-create a future, + Create a queue to store the messages and a task to handle the messages. + """ uid = self.command_count self.command_count += 1 + self.futures[uid] = future = self._loop.create_future() self.callbacks[uid] = callback queue: Queue[dict[str, Any]] = asyncio.Queue() self.callback_queues[uid] = queue self.callback_tasks[uid] = asyncio.create_task( - self.callback_handler(queue, callback, self.futures.get(uid)) + self.callback_handler(queue, callback, future) ) try: return await self.request( From d19372f20f86cf34b5844b305262d866b3fd9c68 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Sun, 26 Jan 2025 23:08:58 +0200 Subject: [PATCH 4/5] Move blocks to named function --- aiowebostv/webos_client.py | 44 ++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index 430bfb6..d4baf25 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -678,16 +678,12 @@ async def request( return payload - async def subscribe( - self, callback: Callable, uri: str, payload: dict[str, Any] | None = None - ) -> dict[str, Any]: - """Subscribe to updates. + async def create_subscription_handler(self, uid: int, callback: Callable) -> None: + """Create a subscription handler for a given uid. - Subsciption use a fixed uid so we need to pre-create a future, - Create a queue to store the messages and a task to handle the messages. + Create a queue to store the messages, a task to handle the messages + and a future to signal first subscription update processed. """ - uid = self.command_count - self.command_count += 1 self.futures[uid] = future = self._loop.create_future() self.callbacks[uid] = callback queue: Queue[dict[str, Any]] = asyncio.Queue() @@ -695,19 +691,35 @@ async def subscribe( self.callback_tasks[uid] = asyncio.create_task( self.callback_handler(queue, callback, future) ) + + async def delete_subscription_handler(self, uid: int) -> None: + """Delete a subscription handler for a given uid.""" + del self.callbacks[uid] + task = self.callback_tasks.pop(uid) + if not task.done(): + task.cancel() + while not task.done(): + with suppress(asyncio.CancelledError): + await asyncio.shield(task) + del self.callback_queues[uid] + + async def subscribe( + self, callback: Callable, uri: str, payload: dict[str, Any] | None = None + ) -> dict[str, Any]: + """Subscribe to updates. + + Subsciption use a fixed uid, pre-create a future and a handler. + """ + uid = self.command_count + self.command_count += 1 + await self.create_subscription_handler(uid, callback) + try: return await self.request( uri, payload=payload, cmd_type="subscribe", uid=uid ) except Exception: - del self.callbacks[uid] - task = self.callback_tasks.pop(uid) - if not task.done(): - task.cancel() - while not task.done(): - with suppress(asyncio.CancelledError): - await asyncio.shield(task) - del self.callback_queues[uid] + await self.delete_subscription_handler(uid) raise async def input_command(self, message: str) -> None: From c82547d15cebe25a7bf8cdc0d65d5c37c9d20cf7 Mon Sep 17 00:00:00 2001 From: Shay Levy Date: Mon, 27 Jan 2025 00:34:50 +0200 Subject: [PATCH 5/5] do not store subsciption callbacks --- aiowebostv/webos_client.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/aiowebostv/webos_client.py b/aiowebostv/webos_client.py index d4baf25..12a9e57 100644 --- a/aiowebostv/webos_client.py +++ b/aiowebostv/webos_client.py @@ -56,7 +56,6 @@ def __init__( self.connect_result: Future[bool] | None = None self.connection: ClientWebSocketResponse | None = None self.input_connection: ClientWebSocketResponse | None = None - self.callbacks: dict[int, Callable] = {} self.futures: dict[int, Future[dict[str, Any]]] = {} self._power_state: dict[str, Any] = {} self._current_app_id: str | None = None @@ -195,7 +194,8 @@ async def connect_handler(self, res: Future) -> None: error = "Client key not set, pairing failed." raise WebOsTvPairError(error) - self.callbacks = {} + self.callback_queues = {} + self.callback_tasks = {} self.futures = {} handler_tasks.add(asyncio.create_task(self.consumer_handler(main_ws))) @@ -301,8 +301,6 @@ async def connect_handler(self, res: Future) -> None: self._hello_info = {} self._sound_output = None self._media_state = [] - self.callback_queues = {} - self.callback_tasks = {} for callback in self.state_update_callbacks: closeout.add(asyncio.create_task(callback(self))) @@ -329,11 +327,8 @@ async def callback_handler( if not future.done(): future.set_result(msg) - async def _process_text_message(self, data: str) -> None: + def _process_text_message(self, data: str) -> None: """Process text message.""" - if not self.callbacks and not self.futures: - return - msg = json.loads(data) uid = msg.get("id") # if we have a callback for this message, put it in the queue @@ -350,7 +345,7 @@ async def consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: if raw_msg.type is not WSMsgType.TEXT: break - await self._process_text_message(raw_msg.data) + self._process_text_message(raw_msg.data) async def input_consumer_handler(self, web_socket: ClientWebSocketResponse) -> None: """Input consumer handler. @@ -685,7 +680,6 @@ async def create_subscription_handler(self, uid: int, callback: Callable) -> Non and a future to signal first subscription update processed. """ self.futures[uid] = future = self._loop.create_future() - self.callbacks[uid] = callback queue: Queue[dict[str, Any]] = asyncio.Queue() self.callback_queues[uid] = queue self.callback_tasks[uid] = asyncio.create_task( @@ -694,7 +688,6 @@ async def create_subscription_handler(self, uid: int, callback: Callable) -> Non async def delete_subscription_handler(self, uid: int) -> None: """Delete a subscription handler for a given uid.""" - del self.callbacks[uid] task = self.callback_tasks.pop(uid) if not task.done(): task.cancel()