From 6e26fbce8ec14f600182a6413050731b6948b68f Mon Sep 17 00:00:00 2001 From: iloveicedgreentea <31193909+iloveicedgreentea@users.noreply.github.com> Date: Sun, 19 Jan 2025 13:17:45 -0500 Subject: [PATCH] try fix notifications on reconnect, more robust tasks, timeouts to connections, add more locks --- madvr/madvr.py | 331 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 215 insertions(+), 116 deletions(-) diff --git a/madvr/madvr.py b/madvr/madvr.py index 01a35cf..78be728 100644 --- a/madvr/madvr.py +++ b/madvr/madvr.py @@ -4,7 +4,7 @@ import asyncio import logging -from typing import Any, Final, Iterable +from typing import Any, Coroutine, Final, Iterable from madvr.commands import Commands, Connections, Footer from madvr.consts import ( @@ -19,7 +19,7 @@ SMALL_DELAY, TASK_CPU_DELAY, ) -from madvr.errors import AckError, HeartBeatError, RetryExceededError +from madvr.errors import AckError, HeartBeatError from madvr.notifications import NotificationProcessor from madvr.wol import send_magic_packet @@ -104,26 +104,35 @@ def set_update_callback(self, callback: Any) -> None: self.update_callback = callback async def async_add_tasks(self) -> None: - """Add background tasks.""" + """Add background tasks with error handling.""" # loop can be passed from HA if not self.loop: self.loop = asyncio.get_event_loop() - task_queue = self.loop.create_task(self.task_handle_queue()) - self.tasks.append(task_queue) - - task_notif = self.loop.create_task(self.task_read_notifications()) - self.tasks.append(task_notif) - - task_hb = self.loop.create_task(self.send_heartbeat()) - self.tasks.append(task_hb) + async def wrapped_task(coro: Coroutine[Any, Any, Any], name: str) -> None: + """Wrapper to handle task errors.""" + try: + await coro + except asyncio.CancelledError: + self.logger.debug("Task %s was cancelled", name) + except Exception as e: + self.logger.exception("Task %s failed with error: %s", name, e) + # If a critical task fails, trigger reconnection + if name in ["notifications", "heartbeat"]: + await self._handle_power_off() - # this will only be cancelled on unload so thats fine - task_ping = self.loop.create_task(self.task_ping_until_alive()) - self.tasks.append(task_ping) + task_definitions = [ + (self.task_handle_queue(), "queue"), + (self.task_read_notifications(), "notifications"), + (self.send_heartbeat(), "heartbeat"), + (self.task_ping_until_alive(), "ping"), + (self.task_refresh_info(), "refresh"), + ] - task_refresh = self.loop.create_task(self.task_refresh_info()) - self.tasks.append(task_refresh) + # start tasks in wrapper + for coro, name in task_definitions: + task = self.loop.create_task(wrapped_task(coro, name)) + self.tasks.append(task) async def async_cancel_tasks(self) -> None: """Cancel all tasks.""" @@ -141,20 +150,30 @@ async def task_handle_queue(self) -> None: """Handle command queue.""" while True: await self.connection_event.wait() - while not self.command_queue.empty() and not self.stop_commands_flag.is_set(): + while ( + not self.command_queue.empty() and not self.stop_commands_flag.is_set() + ): command = await self.command_queue.get() self.logger.debug("sending queue command %s", command) try: await self.send_command(command) except NotImplementedError as err: self.logger.warning("Command not implemented: %s", err) - except (ConnectionError, ConnectionResetError, BrokenPipeError): - self.logger.warning("Task Queue: Envy seems to be disconnected") - except AttributeError: - self.logger.warning("Issue sending command from queue") - except RetryExceededError: - self.logger.warning("Retry exceeded for command %s", command) - except OSError as err: + except ( + ConnectionError, + ConnectionResetError, + BrokenPipeError, + OSError, + ) as err: + self.logger.warning( + "Connection error while sending command: %s", err + ) + # Put the command back in the queue unless it's a power command + if not any(cmd in ["PowerOff", "Standby"] for cmd in command): + await self.command_queue.put(command) + await self._handle_power_off() + break # Exit the inner loop to wait for reconnection + except Exception as err: self.logger.error("Unexpected error when sending command: %s", err) if self.stop_commands_flag.is_set(): @@ -166,68 +185,94 @@ async def task_handle_queue(self) -> None: async def task_read_notifications(self) -> None: """ - Read notifications from the server and update attributes + Read notifications from the server and update attributes. + The task maintains connection state and processes notifications continuously. """ while True: - # wait until the connection is established + # Wait for connection to be established await self.connection_event.wait() + try: - if self.reader: - msg = await asyncio.wait_for( - self.reader.read(self.read_limit), - timeout=self.command_read_timeout, + if not self.reader: + self.logger.warning("Reader not available") + await self._set_connected(False) + await asyncio.sleep(TASK_CPU_DELAY) + continue + + msg = await asyncio.wait_for( + self.reader.read(self.read_limit), + timeout=self.command_read_timeout, + ) + + # An empty message typically means the connection was closed + if not msg: + self.logger.warning( + "Empty message received - connection likely closed" ) - await self._process_notifications(msg.decode("utf-8")) - except TimeoutError: - self.logger.info("No notifications to read") - except ( - ConnectionResetError, - AttributeError, - BrokenPipeError, - OSError, - ) as err: - self.logger.error("Reading notifications failed or timed out: %s", err) + await self._handle_power_off() + continue + try: - # try to connect otherwise it will mark the device as offline - await self._reconnect() - except ConnectionError as e: - self.logger.error("Connection error when reading notifications: %s", e) + await self._process_notifications(msg.decode("utf-8")) + except UnicodeDecodeError as e: + self.logger.error("Failed to decode message: %s", e) + continue + + except asyncio.TimeoutError: + # This is normal - no notifications to read + await asyncio.sleep(TASK_CPU_DELAY) + continue + + except (ConnectionResetError, BrokenPipeError) as err: + self.logger.error("Connection error in notification task: %s", err) + # Connection was reset - mark as disconnected and trigger reconnect after + await self._handle_power_off() + continue + + except Exception as e: + self.logger.exception("Unexpected error in notification task: %s", e) + await asyncio.sleep(TASK_CPU_DELAY) continue await asyncio.sleep(TASK_CPU_DELAY) - continue async def send_heartbeat(self, once: bool = False) -> None: """ Send a heartbeat to keep connection open. - You should wrap this in try with OSError and asyncio.TimeoutError exceptions. - Raises HeartBeatError exception. + Args: + once: If True, only send one heartbeat instead of continuous """ async def perform_heartbeat() -> None: - await self._write_with_timeout(self.HEARTBEAT) + if not self.connected or not self.writer: + raise HeartBeatError("Connection not established") - async def handle_heartbeat_error( - err: TimeoutError | OSError | HeartBeatError, - ) -> None: - self.logger.error("Error when sending heartbeat: %s", err) - raise HeartBeatError("Error when sending heartbeat") from err + try: + async with self.lock: + if self.writer: + self.writer.write(self.HEARTBEAT) + await self.writer.drain() + self.logger.debug("Heartbeat sent") + except (ConnectionError, OSError) as err: + raise HeartBeatError(f"Failed to send heartbeat: {err}") from err if once: - try: - await perform_heartbeat() - except (TimeoutError, OSError) as err: - await handle_heartbeat_error(err) + await perform_heartbeat() return while not self.stop_heartbeat.is_set(): await self.connection_event.wait() + try: await perform_heartbeat() - except (TimeoutError, OSError) as err: - await handle_heartbeat_error(err) - finally: - await asyncio.sleep(self.heartbeat_interval) + except HeartBeatError as err: + self.logger.error("Heartbeat error: %s", err) + await self._handle_power_off() + # Don't retry immediately after error + await asyncio.sleep(self.heartbeat_interval * 2) + continue + + await asyncio.sleep(self.heartbeat_interval) async def task_ping_until_alive(self) -> None: """Check if the device is connectable and connect to it on success.""" @@ -254,7 +299,9 @@ async def task_ping_until_alive(self) -> None: try: await self.open_connection() except ConnectionError as err: - self.logger.error("Error opening connection after connectivity check: %s", err) + self.logger.error( + "Error opening connection after connectivity check: %s", err + ) else: self.logger.debug( "Device is not connectable, retrying in %s seconds", @@ -324,7 +371,9 @@ async def write_and_drain() -> None: async with self.lock: await asyncio.wait_for(write_and_drain(), timeout=self.connect_timeout) except TimeoutError: - self.logger.error("Write operation timed out after %s seconds", self.connect_timeout) + self.logger.error( + "Write operation timed out after %s seconds", self.connect_timeout + ) await self._reconnect() except (ConnectionResetError, OSError) as err: self.logger.error("Error writing to socket: %s", err) @@ -363,7 +412,9 @@ async def _reconnect(self) -> None: self.stop_commands_flag.clear() except (TimeoutError, HeartBeatError, OSError) as err: - self.logger.error("Heartbeat failed. Connection not established %s", err) + self.logger.error( + "Heartbeat failed. Connection not established %s", err + ) await self._set_connected(False) raise ConnectionError("Heartbeat failed") from err else: @@ -399,39 +450,77 @@ async def _clear_attr(self) -> None: self.update_callback(self.msg_dict) async def close_connection(self) -> None: - """close the connection""" - self.logger.debug("closing connection") - try: - if self.writer: + """Close the connection and clean up state.""" + self.logger.debug("Closing connection") + + if self.writer: + try: + # Try to send a graceful goodbye if possible + if not self.writer.is_closing(): + try: + async with self.lock: + self.writer.write(Connections.bye.value) + await self.writer.drain() + except (ConnectionError, OSError): + pass # Ignore errors during goodbye + self.writer.close() - await self.writer.wait_closed() - except (ConnectionResetError, AttributeError): - pass + try: + await asyncio.wait_for(self.writer.wait_closed(), timeout=2) + except asyncio.TimeoutError: + self.logger.warning("Timeout waiting for writer to close") + + except Exception as e: + self.logger.error("Error during connection close: %s", e) + self.writer = None self.reader = None await self._set_connected(False) await self._clear_attr() + self.logger.debug("Connection closed and state cleared") async def open_connection(self) -> None: - """Open a connection""" + """Open a connection with rate limiting.""" self.logger.debug("Starting open connection") - try: - await self._reconnect() - self.logger.debug("Connection opened") - except (AckError, ConnectionError) as err: - self.logger.error("Error opening connection: %s", err) - raise - # once connected, try to refresh data once in the case the device was turned connected to while on already - cmds = [ - ["GetIncomingSignalInfo"], - ["GetOutgoingSignalInfo"], - ["GetAspectRatio"], - ["GetMaskingRatio"], - ["GetMacAddress"], - ] - for cmd in cmds: - await self.add_command_to_queue(cmd) + # Add rate limiting for connection attempts + MAX_RETRIES = 3 + RETRY_DELAY = 15 # seconds + + for attempt in range(MAX_RETRIES): + try: + await self._reconnect() + self.logger.debug("Connection opened successfully") + + # Refresh initial device state + refresh_commands = [ + ["GetIncomingSignalInfo"], + ["GetOutgoingSignalInfo"], + ["GetAspectRatio"], + ["GetMaskingRatio"], + ["GetMacAddress"], + ] + + # Add commands with small delay between each to avoid overwhelming the device + for cmd in refresh_commands: + await self.add_command_to_queue(cmd) + await asyncio.sleep(0.1) + + return + + except (AckError, ConnectionError) as err: + self.logger.error( + "Error opening connection (attempt %d/%d): %s", + attempt + 1, + MAX_RETRIES, + err, + ) + if attempt < MAX_RETRIES - 1: # Don't sleep on last attempt + await asyncio.sleep( + min(RETRY_DELAY * 2**attempt, 300) + ) # Exponential backoff capped at 5min + + raise ConnectionError("Failed to open connection after multiple attempts") @property def connected(self) -> bool: @@ -461,7 +550,9 @@ async def _construct_command(self, raw_command: list[str]) -> tuple[bytes, str]: bytes: the value to send in bytes str: the 'msg' field in the Enum used to filter notifications """ - self.logger.debug("raw_command: %s -- raw_command length: %s", raw_command, len(raw_command)) + self.logger.debug( + "raw_command: %s -- raw_command length: %s", raw_command, len(raw_command) + ) skip_val = False # HA seems to always send commands as a list even if you set them as a str @@ -518,7 +609,9 @@ async def _construct_command(self, raw_command: list[str]) -> tuple[bytes, str]: cmd = command_base + Footer.footer.value except KeyError as exc: - raise NotImplementedError("Incorrect parameter given for command") from exc + raise NotImplementedError( + "Incorrect parameter given for command" + ) from exc else: cmd = command_name + Footer.footer.value @@ -580,33 +673,39 @@ async def _update_ha_state(self) -> None: self.logger.error("Error updating HA: %s", err) async def power_on(self, mac: str = "") -> None: - """ - Power on the device - """ - - # use the detected mac or one that is supplied at init or function call - mac_to_use = self.mac_address or self.mac or mac - if mac_to_use: - # this will allow ping to trigger the connection - self.logger.debug("Turning on with mac %s", mac_to_use) - send_magic_packet(mac_to_use, logger=self.logger) - else: - # without wol, you cant power on the device - self.logger.warning("No mac provided, no action to take. Implement your own WOL automation") + """Power on the device with improved state handling.""" + async with self.lock: # Prevent race conditions + # Reset flags before starting + self.stop_commands_flag.clear() + + # use the detected mac or one that is supplied at init or function call + mac_to_use = self.mac_address or self.mac or mac + if mac_to_use: + self.logger.debug("Turning on with mac %s", mac_to_use) + send_magic_packet(mac_to_use, logger=self.logger) + + # Reset the powered_off flag after sending WOL + # This allows the ping task to start checking connectivity + self.powered_off_recently = False + else: + self.logger.warning( + "No mac provided, no action to take. Implement your own WOL automation" + ) async def power_off(self, standby: bool = False) -> None: - """ - turn off madvr or set to standby + """Turn off madvr or set to standby with improved state handling.""" + async with self.lock: # Prevent race conditions with other operations + self.stop() + self.powered_off_recently = True - standby: bool -> standby instead of poweroff if true - """ - self.stop() - # set the flag to delay the ping task to avoid race conditions - self.powered_off_recently = True - if self.connected: - try: - await self.send_command(["Standby"] if standby else ["PowerOff"]) - except ConnectionError as err: - self.logger.error("Error sending power off command: %s", err) + if self.connected: + try: + await self.send_command(["Standby"] if standby else ["PowerOff"]) + # Give the device a moment to process the command + await asyncio.sleep(SMALL_DELAY) + except ConnectionError: + self.logger.warning( + "Connection error while sending power off command - device might already be off" + ) - await self.close_connection() # + await self.close_connection()