diff --git a/custom_components/dahua/__init__.py b/custom_components/dahua/__init__.py index 26ab414..dc6d067 100644 --- a/custom_components/dahua/__init__.py +++ b/custom_components/dahua/__init__.py @@ -17,7 +17,7 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.const import EVENT_HOMEASSISTANT_STOP -from custom_components.dahua.thread import DahuaEventThread +from custom_components.dahua.thread import DahuaEventThread, DahuaVtoEventThread from . import dahua_utils from .client import DahuaClient @@ -56,16 +56,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): username = entry.data.get(CONF_USERNAME) password = entry.data.get(CONF_PASSWORD) address = entry.data.get(CONF_ADDRESS) - port = entry.data.get(CONF_PORT) - rtsp_port = entry.data.get(CONF_RTSP_PORT) + port = int(entry.data.get(CONF_PORT)) + rtsp_port = int(entry.data.get(CONF_RTSP_PORT)) events = entry.data.get(CONF_EVENTS) - session = async_get_clientsession(hass) - dahua_client = DahuaClient( - username, password, address, port, rtsp_port, session - ) - - coordinator = DahuaDataUpdateCoordinator(hass, dahua_client, events=events) + coordinator = DahuaDataUpdateCoordinator(hass, events=events, address=address, port=port, rtsp_port=rtsp_port, + username=username, password=password) await coordinator.async_config_entry_first_refresh() if not coordinator.last_update_success: @@ -83,8 +79,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): entry.add_update_listener(async_reload_entry) - await coordinator.async_start_event_listener() - entry.async_on_unload( hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, coordinator.async_stop) ) @@ -95,9 +89,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): class DahuaDataUpdateCoordinator(DataUpdateCoordinator): """Class to manage fetching data from the API.""" - def __init__(self, hass: HomeAssistant, dahua_client: DahuaClient, events: list) -> None: + def __init__(self, hass: HomeAssistant, events: list, address: str, port: int, rtsp_port: int, username: str, + password: str) -> None: """Initialize.""" - self.client: DahuaClient = dahua_client + self.client: DahuaClient = DahuaClient(username, password, address, port, rtsp_port, + async_get_clientsession(hass)) self.dahua_event: DahuaEventThread self.platforms = [] self.initialized = False @@ -112,7 +108,10 @@ def __init__(self, hass: HomeAssistant, dahua_client: DahuaClient, events: list) self._supports_disarming_linkage = False # This thread is what connects to the cameras event stream and fires on_receive when there's an event - self.dahua_event = DahuaEventThread(hass, dahua_client, self.on_receive, events) + self.dahua_event = DahuaEventThread(hass, self.client, self.on_receive, events) + # This thread will connect to VTO devices (Dahua doorbells) + self._dahua_vto_event_thread = DahuaVtoEventThread(hass, self.client, self.on_receive_vto_event, host=address, + port=5000, username=username, password=password) # A dictionary of event name (CrossLineDetection, VideoMotion, etc) to a listener for that event self._dahua_event_listeners: dict[str, CALLBACK_TYPE] = dict() @@ -124,14 +123,19 @@ def __init__(self, hass: HomeAssistant, dahua_client: DahuaClient, events: list) super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=SCAN_INTERVAL_SECONDS) async def async_start_event_listener(self): - """ setup """ + """ Starts the event listeners for IP cameras (this does not work for doorbells (VTO)) """ if self.events is not None: self.dahua_event.start() + async def async_start_vto_event_listener(self): + """ Starts the event listeners for doorbells (VTO). This will not work for IP cameras""" + if self._dahua_vto_event_thread is not None: + self._dahua_vto_event_thread.start() + async def async_stop(self, event: Any): """ Stop anything we need to stop """ - if self.dahua_event is not None: - self.dahua_event.stop() + self.dahua_event.stop() + self._dahua_vto_event_thread.stop() async def _async_update_data(self): """Reload the camera information""" @@ -139,14 +143,14 @@ async def _async_update_data(self): data = {} if not self.initialized: - results = await asyncio.gather( + responses = await asyncio.gather( self.client.async_get_system_info(), self.client.async_get_machine_name(), self.client.get_software_version(), ) - for result in results: - data.update(result) + for response in responses: + data.update(response) device_type = data.get("deviceType") if device_type == "IP Camera": @@ -168,6 +172,16 @@ async def _async_update_data(self): except ClientError as exception: self._supports_disarming_linkage = False + is_doorbell = self.is_doorbell() + + if not is_doorbell: + # Start the event listeners for IP cameras + await self.async_start_event_listener() + + if is_doorbell: + # Start the event listeners for door bells (VTO) + await self.async_start_vto_event_listener() + self.initialized = True # Figure out which APIs we need to call and then fan out and gather the results @@ -189,6 +203,36 @@ async def _async_update_data(self): except Exception as exception: raise UpdateFailed() from exception + def on_receive_vto_event(self, event: dict): + self.hass.bus.fire("dahua_event_received", event) + + # Example event: + # {'id': 8, 'method': 'client.notifyEventStream', 'params': {'SID': 513, 'eventList': [ + # {'Action': 'Start', 'Code': 'VideoMotion', + # 'Data': {'LocaleTime': '2021-06-19 15:36:58', 'UTC': 1624088218.0}, 'Index': 0}]}, 'session': 1400947312} + + # This is the event code, example: VideoMotion, CrossLineDetection, etc + event_name = event.get("Code", {}) + + listener = self._dahua_event_listeners.get(event_name) + if listener is not None: + action = event.get("Action", "") + if action == "Start": + self._dahua_event_timestamp[event_name] = int(time.time()) + listener() + elif action == "Stop": + self._dahua_event_timestamp[event_name] = 0 + listener() + elif action == "Pulse": + state = event.get("Data", {}).get("State", 0) + if state == 1: + # button pressed + self._dahua_event_timestamp[event_name] = int(time.time()) + listener() + else: + self._dahua_event_timestamp[event_name] = 0 + listener() + def on_receive(self, data_bytes: bytes): """ Takes in bytes from the Dahua event stream, converts to a string, parses to a dict and fires an event with the data on the HA event bus @@ -238,7 +282,7 @@ def on_receive(self, data_bytes: bytes): # This is the event code, example: VideoMotion, CrossLineDetection, etc event_name = event["Code"] - listener = self._dahua_event_listeners[event_name] + listener = self._dahua_event_listeners.get(event_name) if listener is not None: action = event["action"] if action == "Start": @@ -276,6 +320,12 @@ def supports_security_light(self) -> bool: """ return "-AS-PV" in self.model + def is_doorbell(self) -> bool: + """ + Returns true if this is a doorbell (VTO) + """ + return self.model.upper().startswith("VTO") or self.model.upper().startswith("DHI") + def supports_infrared_light(self) -> bool: """ Returns true if this camera has an infrared light. For example, the IPC-HDW3849HP-AS-PV does not, but most diff --git a/custom_components/dahua/binary_sensor.py b/custom_components/dahua/binary_sensor.py index 342b4d4..24e4ade 100644 --- a/custom_components/dahua/binary_sensor.py +++ b/custom_components/dahua/binary_sensor.py @@ -7,7 +7,7 @@ from .const import ( MOTION_SENSOR_DEVICE_CLASS, - DOMAIN, SAFETY_DEVICE_CLASS, CONNECTIVITY_DEVICE_CLASS, + DOMAIN, SAFETY_DEVICE_CLASS, CONNECTIVITY_DEVICE_CLASS, SOUND_DEVICE_CLASS, ) from .entity import DahuaBaseEntity @@ -16,6 +16,7 @@ NAME_OVERRIDES = { "VideoMotion": "Motion Alarm", "CrossLineDetection": "Cross Line Alarm", + "BackKeyLight": "Button Pressed", # For VTO devices (doorbells) } # Override the device class for events @@ -29,17 +30,24 @@ "StorageFailure": CONNECTIVITY_DEVICE_CLASS, "StorageLowSpace": SAFETY_DEVICE_CLASS, "FireWarning": SAFETY_DEVICE_CLASS, + "BackKeyLight": SOUND_DEVICE_CLASS, } async def async_setup_entry(hass: HomeAssistant, entry, async_add_devices): """Setup binary_sensor platform.""" - coordinator = hass.data[DOMAIN][entry.entry_id] + coordinator: DahuaDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id] sensors: list[DahuaEventSensor] = [] for event_name in coordinator.get_event_list(): sensors.append(DahuaEventSensor(coordinator, entry, event_name)) + # For doorbells we'll just add these since most people will want them + if coordinator.is_doorbell(): + sensors.append(DahuaEventSensor(coordinator, entry, "BackKeyLight")) + sensors.append(DahuaEventSensor(coordinator, entry, "Invite")) + sensors.append(DahuaEventSensor(coordinator, entry, "CallNoAnswered")) + if sensors: async_add_devices(sensors) diff --git a/custom_components/dahua/const.py b/custom_components/dahua/const.py index cdc5fca..b7b6a5b 100644 --- a/custom_components/dahua/const.py +++ b/custom_components/dahua/const.py @@ -19,6 +19,7 @@ MOTION_SENSOR_DEVICE_CLASS = "motion" SAFETY_DEVICE_CLASS = "safety" CONNECTIVITY_DEVICE_CLASS = "connectivity" +SOUND_DEVICE_CLASS = "sound" # Platforms BINARY_SENSOR = "binary_sensor" diff --git a/custom_components/dahua/thread.py b/custom_components/dahua/thread.py index 65c613b..b51589f 100644 --- a/custom_components/dahua/thread.py +++ b/custom_components/dahua/thread.py @@ -1,12 +1,14 @@ """ Dahua Thread """ import asyncio +import sys import threading import logging import time from homeassistant.core import HomeAssistant from custom_components.dahua.client import DahuaClient +from custom_components.dahua.vto import DahuaVTOClient _LOGGER: logging.Logger = logging.getLogger(__package__) @@ -29,7 +31,7 @@ def run(self): self.started = True _LOGGER.debug("Starting DahuaEventThread") - while 1: + while True: # submit the coroutine to the event loop thread coro = self.client.stream_events(self.on_receive, self.events) future = asyncio.run_coroutine_threadsafe(coro, self.hass.loop) @@ -58,3 +60,62 @@ def run(self): def stop(self): """ Signals to the thread loop that we should stop """ self.started = False + + +class DahuaVtoEventThread(threading.Thread): + """Connects to device and subscribes to events. Mainly to capture motion detection events. """ + + def __init__(self, hass: HomeAssistant, client: DahuaClient, on_receive_vto_event, host: str, + port: int, username: str, password: str): + """Construct a thread listening for events.""" + threading.Thread.__init__(self) + self.hass = hass + self.stopped = threading.Event() + self.on_receive_vto_event = on_receive_vto_event + self.client = client + self.started = False + self._host = host + self._port = port + self._username = username + self._password = password + self._is_ssl = False + + def run(self): + """Fetch VTO events""" + self.started = True + _LOGGER.debug("Starting DahuaVtoEventThread") + + while True: + try: + if not self.started: + _LOGGER.debug("Exiting DahuaVtoEventThread") + return + + _LOGGER.info("Connecting to VTO event stream") + + # TODO: How do I integrate this in with the HA loop? Does it even matter? I think so because + # how well do we know when we are shutting down HA? + loop = asyncio.new_event_loop() + + client = loop.create_connection( + lambda: DahuaVTOClient(self._host, self._username, self._password, self._is_ssl, + self.on_receive_vto_event), host=self._host, port=self._port) + loop.run_until_complete(client) + loop.run_forever() + loop.close() + + _LOGGER.warning("Disconnected from VTO, will try to connect in 5 seconds") + + time.sleep(5) + + except Exception as ex: + exc_type, exc_obj, exc_tb = sys.exc_info() + line = exc_tb.tb_lineno + + _LOGGER.error(f"Connection to VTO failed will try to connect in 30 seconds, error: {ex}, Line: {line}") + + time.sleep(30) + + def stop(self): + """ Signals to the thread loop that we should stop """ + self.started = False diff --git a/custom_components/dahua/vto.py b/custom_components/dahua/vto.py new file mode 100644 index 0000000..3c120b0 --- /dev/null +++ b/custom_components/dahua/vto.py @@ -0,0 +1,408 @@ +""" +Copied and modified from https://github.com/elad-bar/DahuaVTO2MQTT +Thanks to @elad-bar +""" +import struct +import sys +import logging +import json +import asyncio +import hashlib +from threading import Timer +from typing import Optional, Callable +import requests +from requests.auth import HTTPDigestAuth + +PROTOCOLS = { + True: "https", + False: "http" +} + +_LOGGER: logging.Logger = logging.getLogger(__package__) + +DAHUA_DEVICE_TYPE = "deviceType" +DAHUA_SERIAL_NUMBER = "serialNumber" +DAHUA_VERSION = "version" +DAHUA_BUILD_DATE = "buildDate" + +DAHUA_GLOBAL_LOGIN = "global.login" +DAHUA_GLOBAL_KEEPALIVE = "global.keepAlive" +DAHUA_EVENT_MANAGER_ATTACH = "eventManager.attach" +DAHUA_CONFIG_MANAGER_GETCONFIG = "configManager.getConfig" +DAHUA_MAGICBOX_GETSOFTWAREVERSION = "magicBox.getSoftwareVersion" +DAHUA_MAGICBOX_GETDEVICETYPE = "magicBox.getDeviceType" + +DAHUA_ALLOWED_DETAILS = [ + DAHUA_DEVICE_TYPE, + DAHUA_SERIAL_NUMBER +] + +ENDPOINT_ACCESS_CONTROL = "accessControl.cgi?action=openDoor&UserID=101&Type=Remote&channel=" + + +class DahuaVTOClient(asyncio.Protocol): + requestId: int + sessionId: int + keep_alive_interval: int + username: str + password: str + realm: Optional[str] + random: Optional[str] + messages: [] + dahua_details: {} + base_url: str + hold_time: int + lock_status: {} + auth: HTTPDigestAuth + data_handlers: {} + + def __init__(self, host: str, username: str, password: str, is_ssl: bool, on_receive_vto_event): + self.dahua_details = {} + self.host = host + self.username = username + self.password = password + self.is_ssl = is_ssl + self.base_url = f"{PROTOCOLS[self.is_ssl]}://{self.host}/cgi-bin/" + self.auth = HTTPDigestAuth(self.username, self.password) + self.realm = None + self.random = None + self.request_id = 1 + self.sessionId = 0 + self.keep_alive_interval = 0 + self.transport = None + self.hold_time = 0 + self.lock_status = {} + self.data_handlers = {} + + # This is the hook back into HA + self.on_receive_vto_event = on_receive_vto_event + + self._loop = asyncio.get_event_loop() + + def connection_made(self, transport): + _LOGGER.debug("Connection established") + + try: + self.transport = transport + self.pre_login() + + except Exception as ex: + exc_type, exc_obj, exc_tb = sys.exc_info() + + _LOGGER.error(f"Failed to handle message, error: {ex}, Line: {exc_tb.tb_lineno}") + + def data_received(self, data): + try: + message = self.parse_response(data) + _LOGGER.debug(f"Data received: {message}") + + message_id = message.get("id") + + handler: Callable = self.data_handlers.get(message_id, self.handle_default) + handler(message) + + except Exception as ex: + exc_type, exc_obj, exc_tb = sys.exc_info() + + _LOGGER.error(f"Failed to handle message, error: {ex}, Line: {exc_tb.tb_lineno}") + + def handle_notify_event_stream(self, params): + try: + event_list = params.get("eventList") + + for message in event_list: + code = message.get("Code") + + for k in self.dahua_details: + if k in DAHUA_ALLOWED_DETAILS: + message[k] = self.dahua_details.get(k) + + self.on_receive_vto_event(message) + + except Exception as ex: + exc_type, exc_obj, exc_tb = sys.exc_info() + + _LOGGER.error(f"Failed to handle event, error: {ex}, Line: {exc_tb.tb_lineno}") + + def handle_default(self, message): + _LOGGER.info(f"Data received without handler: {message}") + + def eof_received(self): + _LOGGER.info('Server sent EOF message') + + self._loop.stop() + + def connection_lost(self, exc): + _LOGGER.error('server closed the connection') + + self._loop.stop() + + def send(self, action, handler, params=None): + if params is None: + params = {} + + self.request_id += 1 + + message_data = { + "id": self.request_id, + "session": self.sessionId, + "magic": "0x1234", + "method": action, + "params": params + } + + self.data_handlers[self.request_id] = handler + + if not self.transport.is_closing(): + message = self.convert_message(message_data) + + self.transport.write(message) + + @staticmethod + def convert_message(data): + message_data = json.dumps(data, indent=4) + + header = struct.pack(">L", 0x20000000) + header += struct.pack(">L", 0x44484950) + header += struct.pack(">d", 0) + header += struct.pack("