diff --git a/homekit/__init__.py b/homekit/__init__.py index 72b41f30..0d84fbc8 100644 --- a/homekit/__init__.py +++ b/homekit/__init__.py @@ -18,17 +18,17 @@ 'Controller', 'BluetoothAdapterError', 'AccessoryDisconnectedError', 'AccessoryNotFoundError', 'AlreadyPairedError', 'AuthenticationError', 'BackoffError', 'BusyError', 'CharacteristicPermissionError', 'ConfigLoadingError', 'ConfigSavingError', 'ConfigurationError', 'FormatError', 'HomeKitException', - 'HttpException', 'IncorrectPairingIdError', 'InvalidAuthTagError', 'InvalidError', 'InvalidSignatureError', - 'MaxPeersError', 'MaxTriesError', 'ProtocolError', 'RequestRejected', 'UnavailableError', 'UnknownError', - 'UnpairedError' + 'HttpException', 'IncorrectPairingIdError', 'PairingAuthError', 'InvalidAuthTagError', 'InvalidError', + 'InvalidSignatureError', 'MaxPeersError', 'MaxTriesError', 'ProtocolError', 'RequestRejected', 'UnavailableError', + 'UnknownError', 'UnpairedError' ] from homekit.controller import Controller from homekit.exceptions import BluetoothAdapterError, AccessoryDisconnectedError, AccessoryNotFoundError, \ AlreadyPairedError, AuthenticationError, BackoffError, BusyError, CharacteristicPermissionError, \ ConfigLoadingError, ConfigSavingError, ConfigurationError, FormatError, HomeKitException, HttpException, \ - IncorrectPairingIdError, InvalidAuthTagError, InvalidError, InvalidSignatureError, MaxPeersError, MaxTriesError, \ - ProtocolError, RequestRejected, UnavailableError, UnknownError, UnpairedError + IncorrectPairingIdError, PairingAuthError, InvalidAuthTagError, InvalidError, InvalidSignatureError, \ + MaxPeersError, MaxTriesError, ProtocolError, RequestRejected, UnavailableError, UnknownError, UnpairedError from homekit.tools import IP_TRANSPORT_SUPPORTED diff --git a/homekit/controller/ble_impl/__init__.py b/homekit/controller/ble_impl/__init__.py index 39dcfbda..60880274 100644 --- a/homekit/controller/ble_impl/__init__.py +++ b/homekit/controller/ble_impl/__init__.py @@ -592,12 +592,18 @@ def find_characteristic_by_uuid(device, service_uuid, char_uuid): if ServicesTypes.get_short(service_uuid.upper()) == ServicesTypes.get_short(possible_service.uuid.upper()): service_found = possible_service break - logger.debug('searched service: %s', service_found) if not service_found: - logging.error('searched service not found.') + try: + srv_desc = f'{ServicesTypes[service_uuid]} ' + except KeyError: + srv_desc = '' + + logging.error('searched service %snot found.', srv_desc) return None, None + logger.debug('searched service: %s', service_found.uuid) + result_char = None result_char_id = None for characteristic in service_found.characteristics: @@ -612,7 +618,12 @@ def find_characteristic_by_uuid(device, service_uuid, char_uuid): result_char_id = cid if not result_char: - logging.error('searched char not found.') + try: + char_desc = f'{CharacteristicsTypes[char_uuid]} ' + except KeyError: + char_desc = '' + + logging.error('searched char %snot found.', char_desc) return None, None logger.debug('searched char: %s %s', result_char, result_char_id) diff --git a/homekit/controller/ble_impl/device.py b/homekit/controller/ble_impl/device.py index b347dfed..a46beef3 100644 --- a/homekit/controller/ble_impl/device.py +++ b/homekit/controller/ble_impl/device.py @@ -105,14 +105,20 @@ def connect(self): except dbus.exceptions.DBusException: raise AccessoryNotFoundError('Unable to resolve device services + characteristics') + def characteristic_read_value_succeeded(self, characteristic): + logger.debug('read success: %s %s', str(characteristic.uuid)) + def characteristic_read_value_failed(self, characteristic, error): - logger.debug('read failed: %s %s', characteristic, error) + logger.debug('read failed: %s %s', str(characteristic.uuid), error) def characteristic_write_value_succeeded(self, characteristic): - logger.debug('write success: %s', characteristic) + logger.debug('write success: %s', str(characteristic.uuid)) def characteristic_write_value_failed(self, characteristic, error): - logger.debug('write failed: %s %s', characteristic, error) + logger.debug('write failed: %s %s', str(characteristic.uuid), error) + + def descriptor_read_value_failed(self, descriptor, error): + logger.debug('read descriptor failed: %s %s', str(descriptor.uuid), error) class DeviceManager(gatt.DeviceManager): diff --git a/homekit/controller/ble_impl/gatt.py b/homekit/controller/ble_impl/gatt.py index f0d55378..94a2854d 100644 --- a/homekit/controller/ble_impl/gatt.py +++ b/homekit/controller/ble_impl/gatt.py @@ -61,7 +61,7 @@ def read_value(self, offset=0): return val except dbus.exceptions.DBusException as e: error = _error_from_dbus_error(e) - self.service.device.descriptor_read_value_failed(self, error=error) + self.characteristic.service.device.descriptor_read_value_failed(self, error=error) class Characteristic(gatt.Characteristic): diff --git a/homekit/controller/controller.py b/homekit/controller/controller.py index 30802b35..ac46c6f7 100644 --- a/homekit/controller/controller.py +++ b/homekit/controller/controller.py @@ -22,14 +22,18 @@ import re import tlv8 +from enum import IntEnum + from homekit.exceptions import AccessoryNotFoundError, ConfigLoadingError, UnknownError, \ - AuthenticationError, ConfigSavingError, AlreadyPairedError, TransportNotSupportedError, MalformedPinError + AuthenticationError, ConfigSavingError, AlreadyPairedError, TransportNotSupportedError, \ + MalformedPinError, PairingAuthError from homekit.protocol import States, Methods, Errors, TlvTypes from homekit.http_impl import HomeKitHTTPConnection from homekit.protocol.statuscodes import HapStatusCodes from homekit.protocol import perform_pair_setup_part1, perform_pair_setup_part2, create_ip_pair_setup_write from homekit.model.services.service_types import ServicesTypes from homekit.model.characteristics.characteristic_types import CharacteristicsTypes +from homekit.model.feature_flags import FeatureFlags from homekit.protocol.opcodes import HapBleOpCodes from homekit.tools import IP_TRANSPORT_SUPPORTED, BLE_TRANSPORT_SUPPORTED from homekit.controller.tools import NotSupportedPairing @@ -41,7 +45,7 @@ from .ble_impl.discovery import DiscoveryDeviceManager if IP_TRANSPORT_SUPPORTED: - from homekit.zeroconf_impl import discover_homekit_devices, find_device_ip_and_port + from homekit.zeroconf_impl import discover_homekit_devices, find_device_ip_port_props from homekit.controller.ip_implementation import IpPairing, IpSession @@ -60,6 +64,17 @@ def __init__(self, ble_adapter='hci0'): self.ble_adapter = ble_adapter self.logger = logging.getLogger('homekit.controller.Controller') + class PairingAuth(IntEnum): + """ + Types of pairing authentication strategies + Auto: try pairing with hardware authentication , fall back to software authentication if necessary + HwAuth: only try hardware authentication + SwAuth: only try software authentication + """ + Auto = 0 + HwAuth = 1 + SwAuth = 2 + @staticmethod def discover(max_seconds=10): """ @@ -152,7 +167,7 @@ def identify(accessory_id): """ if not IP_TRANSPORT_SUPPORTED: raise TransportNotSupportedError('IP') - connection_data = find_device_ip_and_port(accessory_id) + connection_data = find_device_ip_port_props(accessory_id) if connection_data is None: raise AccessoryNotFoundError('Cannot find accessory with id "{i}".'.format(i=accessory_id)) @@ -324,7 +339,34 @@ def check_pin_format(pin): if not re.match(r'^\d\d\d-\d\d-\d\d\d$', pin): raise MalformedPinError('The pin must be of the following XXX-XX-XXX where X is a digit between 0 and 9.') - def perform_pairing(self, alias, accessory_id, pin): + def _get_pair_method(self, auth_method: PairingAuth, feature_flags: int): + """ + Returns the used pairing method based on the supported features + + A feature flag of 0 will be treated as pairing with software authentication support, to allow uncertified + accessories which are not MFi certified. + + :param auth_method: user specified authentication method + :param feature_flags: represents the supported features. 0 is treated as "software authentication allowed". + :returns: + :raises PairingAuthError: if pairing authentication method is not supported + """ + pair_method = Methods.PairSetup + + if auth_method == Controller.PairingAuth.Auto: + if feature_flags & FeatureFlags.APPLE_MFI_COPROCESSOR: + pair_method = Methods.PairSetupWithAuth + elif auth_method == Controller.PairingAuth.HwAuth: + pair_method = Methods.PairSetupWithAuth + elif auth_method == Controller.PairingAuth.SwAuth: + pass + else: + raise PairingAuthError(f'auth_method: invalid value "{str(auth_method)}"') + + logging.debug('using pairing method %s', pair_method) + return pair_method + + def perform_pairing(self, alias, accessory_id, pin, auth_method=PairingAuth.HwAuth): """ This performs a pairing attempt with the IP accessory identified by its id. @@ -338,6 +380,7 @@ def perform_pairing(self, alias, accessory_id, pin): :param alias: the alias for the accessory in the controllers data :param accessory_id: the accessory's id :param pin: function to return the accessory's pin + :param auth_method: defines the used pairing auth_method :raises AccessoryNotFoundError: if no accessory with the given id can be found :raises AlreadyPairedError: if the alias was already used :raises UnavailableError: if the device is already paired @@ -347,12 +390,14 @@ def perform_pairing(self, alias, accessory_id, pin): :raises MaxPeersError: if the device cannot accept an additional pairing :raises UnavailableError: on wrong pin :raises MalformedPinError: if the pin is malformed + :raises PairingAuthError: if pairing authentication method is not supported """ Controller.check_pin_format(pin) - finish_pairing = self.start_pairing(alias, accessory_id) + + finish_pairing = self.start_pairing(alias, accessory_id, auth_method) return finish_pairing(pin) - def start_pairing(self, alias, accessory_id): + def start_pairing(self, alias, accessory_id, auth_method=PairingAuth.Auto): """ This starts a pairing attempt with the IP accessory identified by its id. It returns a callable (finish_pairing) which you must call with the pairing pin. @@ -368,6 +413,7 @@ def start_pairing(self, alias, accessory_id): :param alias: the alias for the accessory in the controllers data :param accessory_id: the accessory's id :param pin: function to return the accessory's pin + :param auth_method: defines the used pairing auth_method :raises AccessoryNotFoundError: if no accessory with the given id can be found :raises AlreadyPairedError: if the alias was already used :raises UnavailableError: if the device is already paired @@ -376,21 +422,24 @@ def start_pairing(self, alias, accessory_id): :raises AuthenticationError: if the verification of the device's SRP proof fails :raises MaxPeersError: if the device cannot accept an additional pairing :raises UnavailableError: on wrong pin + :raises PairingAuthError: if pairing authentication method is not supported """ if not IP_TRANSPORT_SUPPORTED: raise TransportNotSupportedError('IP') if alias in self.pairings: raise AlreadyPairedError('Alias "{a}" is already paired.'.format(a=alias)) - connection_data = find_device_ip_and_port(accessory_id) + connection_data = find_device_ip_port_props(accessory_id) if connection_data is None: raise AccessoryNotFoundError('Cannot find accessory with id "{i}".'.format(i=accessory_id)) conn = HomeKitHTTPConnection(connection_data['ip'], port=connection_data['port']) + pair_method = self._get_pair_method(auth_method, connection_data['properties']['ff']) + try: write_fun = create_ip_pair_setup_write(conn) - state_machine = perform_pair_setup_part1() + state_machine = perform_pair_setup_part1(pair_method) request, expected = state_machine.send(None) while True: try: @@ -426,7 +475,52 @@ def finish_pairing(pin): return finish_pairing - def perform_pairing_ble(self, alias, accessory_mac, pin, adapter='hci0'): + def _read_feature_flags_ble(self, device) -> int: + """helper function to read out the feature flags of the accessory + :param device: device connected to the accessory used for the request. Should be connected. + :return: 1 byte integer representing the feature flags + """ + pair_features_char, pair_features_char_id = find_characteristic_by_uuid(device, + ServicesTypes.PAIRING_SERVICE, + CharacteristicsTypes.PAIRING_FEATURES) + logging.debug('setup char: %s %s', str(pair_features_char.uuid), pair_features_char.service.device) + + assert pair_features_char and pair_features_char_id, 'pairing feature characteristic not defined' + + # prepare request + transaction_id = random.randrange(0, 255) + wdata = bytearray([0x00, HapBleOpCodes.CHAR_READ, transaction_id]) + wdata.extend(pair_features_char_id.to_bytes(length=2, byteorder='little')) + logging.debug('sent %s', bytes(wdata).hex()) + + logging.debug('reading pairing features') + + pair_features_char.write_value(value=wdata) + rdata = [] + import time + while len(rdata) == 0: + time.sleep(1) + rdata = pair_features_char.read_value() + + resp_data = [b for b in rdata] + expected_length = int.from_bytes(bytes(resp_data[3:5]), byteorder='little') + + logging.debug( + 'control field: {c:x}, tid: {t:x}, status: {s:x}, length: {length}'.format(c=resp_data[0], t=resp_data[1], + s=resp_data[2], + length=expected_length)) + # 3 for uint8 TLV + assert expected_length == 3, 'invalid return length, expecting uint8' + assert transaction_id == resp_data[1], 'transaction id mismatch' + assert HapStatusCodes.SUCCESS == resp_data[2], f'hap status code: {HapStatusCodes[resp_data[2]]}' + + resp_data = tlv8.decode(bytes([int(a) for a in resp_data[5:]]), + expected={AdditionalParameterTypes.Value: tlv8.DataType.INTEGER}) + assert 0 < len(resp_data), 'received data is not an integer' + + return resp_data.first_by_id(AdditionalParameterTypes.Value).data + + def perform_pairing_ble(self, alias, accessory_mac, pin, adapter='hci0', auth_method=PairingAuth.Auto): """ This performs a pairing attempt with the Bluetooth LE accessory identified by its mac address. @@ -442,14 +536,17 @@ def perform_pairing_ble(self, alias, accessory_mac, pin, adapter='hci0'): :param accessory_mac: the accessory's mac address :param pin: function to return the accessory's pin :param adapter: the bluetooth adapter to be used (defaults to hci0) + :param auth_method: defines the used pairing auth_method :raises MalformedPinError: if the pin is malformed + :raises PairingAuthError: if pairing authentication method is not supported # TODO add raised exceptions """ Controller.check_pin_format(pin) - finish_pairing = self.start_pairing_ble(alias, accessory_mac, adapter) + + finish_pairing = self.start_pairing_ble(alias, accessory_mac, adapter, auth_method) return finish_pairing(pin) - def start_pairing_ble(self, alias, accessory_mac, adapter='hci0'): + def start_pairing_ble(self, alias, accessory_mac, adapter='hci0', auth_method=PairingAuth.Auto): """ This starts a pairing attempt with the Bluetooth LE accessory identified by its mac address. It returns a callable (finish_pairing) which you must call with the pairing pin. @@ -464,6 +561,8 @@ def start_pairing_ble(self, alias, accessory_mac, adapter='hci0'): :param alias: the alias for the accessory in the controllers data :param accessory_mac: the accessory's mac address :param adapter: the bluetooth adapter to be used (defaults to hci0) + :param auth_method: defines the used pairing auth_method + :raises PairingAuthError: if pairing authentication method is not supported # TODO add raised exceptions """ if not BLE_TRANSPORT_SUPPORTED: @@ -479,13 +578,18 @@ def start_pairing_ble(self, alias, accessory_mac, adapter='hci0'): device.connect() logging.debug('connected to device') + # --- read pairing features to determine if the authentication method is actually supported + feature_flags = self._read_feature_flags_ble(device) + pair_method = self._get_pair_method(auth_method, feature_flags) + + # --- perform pairing pair_setup_char, pair_setup_char_id = find_characteristic_by_uuid(device, ServicesTypes.PAIRING_SERVICE, CharacteristicsTypes.PAIR_SETUP) - logging.debug('setup char: %s %s', pair_setup_char, pair_setup_char.service.device) + logging.debug('setup char: %s %s', str(pair_setup_char.uuid), pair_setup_char.service.device) write_fun = create_ble_pair_setup_write(pair_setup_char, pair_setup_char_id) - state_machine = perform_pair_setup_part1() + state_machine = perform_pair_setup_part1(pair_method) request, expected = state_machine.send(None) while True: try: diff --git a/homekit/controller/ip_implementation.py b/homekit/controller/ip_implementation.py index 12d0c98a..7875c57b 100644 --- a/homekit/controller/ip_implementation.py +++ b/homekit/controller/ip_implementation.py @@ -30,7 +30,7 @@ from homekit.protocol import get_session_keys, create_ip_pair_verify_write from homekit.protocol import States, Methods, Errors, TlvTypes from homekit.model.characteristics import CharacteristicsTypes -from homekit.zeroconf_impl import find_device_ip_and_port +from homekit.zeroconf_impl import find_device_ip_port_props from homekit.model.services import ServicesTypes @@ -476,7 +476,7 @@ def __init__(self, pairing_data): if not connected: # no connection yet, so ip / port might have changed and we need to fall back to slow zeroconf lookup device_id = pairing_data['AccessoryPairingID'] - connection_data = find_device_ip_and_port(device_id) + connection_data = find_device_ip_port_props(device_id) # update pairing data with the IP/port we elaborated above, perhaps next time they are valid pairing_data['AccessoryIP'] = connection_data['ip'] diff --git a/homekit/exceptions.py b/homekit/exceptions.py index 8415200e..89b230aa 100644 --- a/homekit/exceptions.py +++ b/homekit/exceptions.py @@ -14,7 +14,6 @@ # limitations under the License. # - class HomeKitException(Exception): """Generic HomeKit exception. Attributes: @@ -66,6 +65,14 @@ class AuthenticationError(ProtocolError): pass +class PairingAuthError(ProtocolError): + """ + Raised in on pairing if the current pairing method is not supported. + This can happen if an accessory does not support a specific authentication method. + """ + pass + + class BackoffError(ProtocolError): """ Raised upon receipt of a back off error. It seems unclear when this is raised, must be related to diff --git a/homekit/model/feature_flags.py b/homekit/model/feature_flags.py index 7e11a2cd..adcaa05d 100644 --- a/homekit/model/feature_flags.py +++ b/homekit/model/feature_flags.py @@ -14,22 +14,33 @@ # limitations under the License. # - class _FeatureFlags(object): """ - Data taken form table 5-8 Bonjour TXT Record Feature Flags on page 69. + Data taken from table 5-8 Bonjour TXT Record Feature Flags on page 69. """ + APPLE_MFI_COPROCESSOR = 0x01 + SOFTWARE_MFI_AUTH = 0x02 + def __init__(self): self._data = { - 0: 'No support for HAP Pairing', - 1: 'Supports HAP Pairing' + 0x00: 'No support for HAP Pairing', # this might also be uncertified + self.APPLE_MFI_COPROCESSOR: 'Apple authentication coprocessor', + self.SOFTWARE_MFI_AUTH: 'Software authentication', } - def __getitem__(self, item): - bit_value = item & 0x01 - if bit_value in self._data: - return self._data[bit_value] + def __getitem__(self, item: int) -> str: + data = [] + if 0 != (item & self.APPLE_MFI_COPROCESSOR): + data.append(self._data[self.APPLE_MFI_COPROCESSOR]) + if 0 != (item & self.SOFTWARE_MFI_AUTH): + data.append(self._data[self.SOFTWARE_MFI_AUTH]) + + if data: + return 'Supports HAP Pairing with ' + ' and '.join(data) + elif 0 == item: + # Note: this may change if feature flags will have more flags! + return self._data[0] raise KeyError('Item {item} not found'.format(item=item)) diff --git a/homekit/pair.py b/homekit/pair.py index d945770f..6512bff7 100755 --- a/homekit/pair.py +++ b/homekit/pair.py @@ -32,6 +32,11 @@ def setup_args_parser(): parser.add_argument('-p', action='store', required=False, dest='pin', help='HomeKit configuration code') parser.add_argument('-f', action='store', required=True, dest='file', help='HomeKit pairing data file') parser.add_argument('-a', action='store', required=True, dest='alias', help='alias for the pairing') + parser.add_argument('-s', action='store', required=False, dest='auth_method', choices=['auto', 'hw', 'sw'], + default='auto', help='''authentication method: + auto - determine authentication method automatically (default) + sw - software authentication + hw - hardware authentication (requires Apple MFI coprocessor)''') add_log_arguments(parser) return parser.parse_args() @@ -74,7 +79,7 @@ def tmp(): pin_function = pin_from_keyboard() try: - finish_pairing = controller.start_pairing(args.alias, args.device) + finish_pairing = controller.start_pairing(args.alias, args.device, args.auth_method) finish_pairing(pin_function()) pairing = controller.get_pairings()[args.alias] pairing.list_accessories_and_characteristics() diff --git a/homekit/pair_ble.py b/homekit/pair_ble.py index 6e26d5ad..0f53d6f6 100755 --- a/homekit/pair_ble.py +++ b/homekit/pair_ble.py @@ -32,6 +32,11 @@ def setup_args_parser(): parser.add_argument('-p', action='store', required=False, dest='pin', help='HomeKit configuration code') parser.add_argument('-f', action='store', required=True, dest='file', help='HomeKit pairing data file') parser.add_argument('-a', action='store', required=True, dest='alias', help='alias for the pairing') + parser.add_argument('-s', action='store', required=False, dest='auth_method', choices=['auto', 'hw', 'sw'], + default='auto', help='''authentication method: + auto - determine authentication method automatically (default) + sw - software authentication + hw - hardware authentication (requires Apple MFI coprocessor)''') parser.add_argument('--adapter', action='store', dest='adapter', default='hci0', help='the bluetooth adapter to be used (defaults to hci0)') add_log_arguments(parser) @@ -64,7 +69,7 @@ def setup_args_parser(): try: logging.debug('start pairing') - finish_pairing = controller.start_pairing_ble(args.alias, args.mac, args.adapter) + finish_pairing = controller.start_pairing_ble(args.alias, args.mac, args.adapter, args.auth_method) finish_pairing(pin_function()) pairing = controller.get_pairings()[args.alias] pairing.list_accessories_and_characteristics() diff --git a/homekit/protocol/__init__.py b/homekit/protocol/__init__.py index b74de123..46b1aadc 100644 --- a/homekit/protocol/__init__.py +++ b/homekit/protocol/__init__.py @@ -91,10 +91,11 @@ def write_http(request, expected): return write_http -def perform_pair_setup_part1(): +def perform_pair_setup_part1(pair_method): """ Performs a pair setup operation as described in chapter 4.7 page 39 ff. + :param pair_method: method used for pairing :return: a tuple of salt and server's public key :raises UnavailableError: if the device is already paired :raises MaxTriesError: if the device received more than 100 unsuccessful pairing attempts @@ -110,7 +111,7 @@ def perform_pair_setup_part1(): logging.debug('#1 ios -> accessory: send SRP start request') request_tlv = [ tlv8.Entry(TlvTypes.State, States.M1), - tlv8.Entry(TlvTypes.Method, Methods.PairSetup) + tlv8.Entry(TlvTypes.Method, pair_method) ] step2_expectations = { diff --git a/homekit/protocol/methods.py b/homekit/protocol/methods.py index 9750824a..3ddbb7c8 100644 --- a/homekit/protocol/methods.py +++ b/homekit/protocol/methods.py @@ -16,8 +16,9 @@ class Methods(IntEnum): - # Methods (see table 4-4 page 60) - PairSetup = 1 + # Methods (see open source HomneKit ADK, HAPPairingMethod (HAPPairing.h:56) + PairSetup = 0 + PairSetupWithAuth = 1 PairVerify = 2 AddPairing = 3 RemovePairing = 4 diff --git a/homekit/zeroconf_impl/__init__.py b/homekit/zeroconf_impl/__init__.py index 60647246..8b2c1f33 100644 --- a/homekit/zeroconf_impl/__init__.py +++ b/homekit/zeroconf_impl/__init__.py @@ -183,7 +183,7 @@ def parse_discovery_properties(props): return data -def find_device_ip_and_port(device_id: str, max_seconds=10): +def find_device_ip_port_props(device_id: str, max_seconds=10): """ Try to find a HomeKit Accessory via Bonjour. The process is time boxed by the second parameter which sets an upper limit of `max_seconds` before it times out. The runtime of the function may be longer because of the Bonjour @@ -191,7 +191,7 @@ def find_device_ip_and_port(device_id: str, max_seconds=10): :param device_id: the Accessory's pairing id :param max_seconds: the number of seconds to wait for the accessory to be found - :return: a dict with ip and port if the accessory was found or None + :return: a dict with ip, port and properties if the accessory was found or None """ result = None zeroconf = Zeroconf() @@ -204,7 +204,10 @@ def find_device_ip_and_port(device_id: str, max_seconds=10): data = listener.get_data() for info in data: if info.properties[b'id'].decode() == device_id: - result = {'ip': inet_ntoa(info.addresses[0]), 'port': info.port} + result = {'ip': inet_ntoa(info.addresses[0]), + 'port': info.port, + 'properties': parse_discovery_properties(decode_discovery_properties(info.properties)), + } break counter += 1 diff --git a/tests/__init__.py b/tests/__init__.py index 1eb88f04..3a5cb508 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -18,7 +18,8 @@ 'TestServerData', 'BleCharacteristicFormatsTest', 'BleCharacteristicUnitsTest', 'CharacteristicTypesTest', 'TestBLEController', 'TestChacha20poly1305', 'TestCharacteristicsTypes', 'TestController', 'TestControllerIpPaired', 'TestControllerIpUnpaired', 'TestHttpResponse', 'TestHttpStatusCodes', 'TestMfrData', 'TestSrp', - 'TestZeroconf', 'TestBLEPairing', 'TestServiceTypes', 'TestSecureHttp', 'TestHTTPPairing', 'TestSecureSession' + 'TestZeroconf', 'TestBLEPairing', 'TestServiceTypes', 'TestSecureHttp', 'TestHTTPPairing', 'TestSecureSession', + 'TestFeatureFlags' ] from tests.bleCharacteristicFormats_test import BleCharacteristicFormatsTest @@ -29,6 +30,7 @@ from tests.characteristicTypes_test import CharacteristicTypesTest from tests.characteristicsTypes_test import TestCharacteristicsTypes from tests.controller_test import TestControllerIpPaired, TestControllerIpUnpaired, TestController +from tests.feature_flags_test import TestFeatureFlags from tests.httpStatusCodes_test import TestHttpStatusCodes from tests.http_response_test import TestHttpResponse from tests.regression_test import TestHTTPPairing, TestSecureSession diff --git a/tests/ble_controller_test.py b/tests/ble_controller_test.py index c6f09c21..3e34ba3e 100644 --- a/tests/ble_controller_test.py +++ b/tests/ble_controller_test.py @@ -21,13 +21,17 @@ import logging import tlv8 +from typing import Optional + from homekit.crypto.chacha20poly1305 import chacha20_aead_decrypt, chacha20_aead_encrypt from homekit import Controller from homekit.model import Accessory from homekit.model.characteristics import CharacteristicsTypes from homekit.model.services import ServicesTypes, AbstractService, LightBulbService from homekit.model.characteristics import AbstractCharacteristic +from homekit.model.feature_flags import FeatureFlags from homekit.protocol import States, Methods, TlvTypes +from homekit.protocol.statuscodes import HapStatusCodes from homekit import accessoryserver from homekit.model import mixin as model_mixin from homekit import exceptions @@ -74,7 +78,7 @@ class Device: connected = False - def __init__(self, accessory: Accessory): + def __init__(self, accessory: Accessory, feature_flags: Optional[int] = None): self.accessory = accessory self.name = 'Test' # FIXME get from accessory self.mac_address = '00:00:00:00:00' @@ -98,6 +102,8 @@ def __init__(self, accessory: Accessory): self.services.append(PairingServiceHandler(self)) + self.set_feature_flags(feature_flags) + def set_accessory_keys(self, ltpk, ltsk): self.accessory_ltpk = ltpk self.accessory_ltsk = ltsk @@ -150,6 +156,23 @@ def disconnect(self): def is_connected(self): return self.connected + def set_feature_flags(self, feature_flags: Optional[int]): + pairing_srv = None + for srv in self.services: + if isinstance(srv, PairingServiceHandler): + pairing_srv = srv + break + + for char in pairing_srv.characteristics: + if isinstance(char, PairingFeaturesCharacteristicHandler): + if feature_flags is None: + char.reset_feature_flags() + elif feature_flags > (FeatureFlags.APPLE_MFI_COPROCESSOR | FeatureFlags.SOFTWARE_MFI_AUTH): + raise ValueError('"feature_flags": invalid value') + else: + char.feature_flags = feature_flags + break + class Service: """ @@ -197,6 +220,7 @@ def __init__(self, device): self.characteristics.append(PairingSetupCharacteristicHandler(self)) self.characteristics.append(PairingVerifyCharacteristicHandler(self)) self.characteristics.append(PairingPairingsCharacteristicHandler(self)) + self.characteristics.append(PairingFeaturesCharacteristicHandler(self)) class Characteristic: @@ -530,6 +554,49 @@ def do_char_write(self, tid, value): self.queue_read_response(self.encrypt_value(bytes(response))) +class PairingFeaturesCharacteristicHandler(Characteristic): + """ + This is a fake gatt.Characteristic + + It is intended to handle the special case of reading out the feature flags of a fake accessory. + """ + + def __init__(self, service): + characteristic = CharacteristicEntry( + model_mixin.get_id(), + 'public.hap.characteristic.pairing.features', + 'data', + ) + + super().__init__(service, characteristic) + + self.reset_feature_flags() + + self.rh = AccessoryRequestHandler(self) + self.values = [] + + def reset_feature_flags(self): + self.feature_flags = FeatureFlags.APPLE_MFI_COPROCESSOR | FeatureFlags.SOFTWARE_MFI_AUTH + + def write_value(self, value): + assert value[0] == 0 + opcode = value[1] + + if opcode == HapBleOpCodes.CHAR_READ: + try: + ff_tlv = tlv8.Entry(AdditionalParameterTypes.Value, self.feature_flags) + byte_val = b'\x00' + value[2].to_bytes(length=1, byteorder='little') + \ + int(HapStatusCodes.SUCCESS).to_bytes(length=1, byteorder='little') + \ + int(3).to_bytes(length=2, byteorder='little') + \ + tlv8.encode(tlv8.EntryList([ff_tlv])) + + self.values.append(byte_val) + except Exception as e: + print(repr(e)) + else: + super().write_value(value) + + class Descriptor: """ A fake gatt.Descriptor @@ -701,6 +768,134 @@ def test_pair_unpair(self): self.assertEqual(len(device.peers), 0) self.assertNotIn('test-pairing', c.pairings) + def test_pair_supported_auth(self): + model_mixin.id_counter = 0 + + a = Accessory( + 'test-dev-123', + 'TestCo', + 'Test Dev Pro', + '00000', + 1 + ) + + manager = DeviceManager() + # --- hw auth and software auth + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111') + + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111', + auth_method=Controller.PairingAuth.HwAuth) + + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111', + auth_method=Controller.PairingAuth.SwAuth) + + # --- hw auth only + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a, FeatureFlags.APPLE_MFI_COPROCESSOR) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111') + + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a, FeatureFlags.APPLE_MFI_COPROCESSOR) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111', + auth_method=Controller.PairingAuth.HwAuth) + + # --- sw auth only + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a, FeatureFlags.SOFTWARE_MFI_AUTH) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111') + + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a, FeatureFlags.SOFTWARE_MFI_AUTH) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111', + auth_method=Controller.PairingAuth.SwAuth) + + # --- not certified + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a, 0) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111') + + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a, 0) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111', + auth_method=Controller.PairingAuth.SwAuth) + + def test_pair_unsupported_auth(self): + model_mixin.id_counter = 0 + + a = Accessory( + 'test-dev-123', + 'TestCo', + 'Test Dev Pro', + '00000', + 1 + ) + + manager = DeviceManager() + + # --- hw auth only + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a, FeatureFlags.APPLE_MFI_COPROCESSOR) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111', + auth_method=Controller.PairingAuth.SwAuth) + + self.assertRaises(exceptions.PairingAuthError) + # --- sw auth only + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a, FeatureFlags.SOFTWARE_MFI_AUTH) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111', + auth_method=Controller.PairingAuth.HwAuth) + self.assertRaises(exceptions.PairingAuthError) + + # --- not certified + with mock.patch('homekit.controller.ble_impl.device.DeviceManager') as m: + manager._devices['00:00:00:00:00'] = Device(a, 0) + m.return_value = manager + + c = Controller() + c.perform_pairing_ble('test-pairing', '00:00:00:00:00', '111-11-111', + auth_method=Controller.PairingAuth.HwAuth) + + self.assertRaises(exceptions.PairingAuthError) + def test_list_accessories_and_characteristics(self): model_mixin.id_counter = 0 @@ -841,7 +1036,18 @@ def test_list_accessories_and_characteristics(self): "range": None, "step": None, "type": "00000050-0000-1000-8000-0026BB765291", - "unit": "unknown"} + "unit": "unknown" + }, + { + "iid": 15, + "type": "0000004F-0000-1000-8000-0026BB765291", + "perms": [], + "description": "", + "format": "data", + "unit": "unknown", + "range": None, + "step": None + }, ], "iid": 11, "type": "00000055-0000-1000-8000-0026BB765291" diff --git a/tests/feature_flags_test.py b/tests/feature_flags_test.py index 8224f9d4..fd468f66 100644 --- a/tests/feature_flags_test.py +++ b/tests/feature_flags_test.py @@ -22,14 +22,20 @@ class TestFeatureFlags(unittest.TestCase): def test_no_support_hap_pairing(self): - self.assertEqual(FeatureFlags[0], 'No support for HAP Pairing') + self.assertEqual(FeatureFlags[0x00], 'No support for HAP Pairing') - def test_support_hap_pairing(self): - self.assertEqual(FeatureFlags[1], 'Supports HAP Pairing') + def test_support_hap_pairing_hw(self): + self.assertEqual(0x01, FeatureFlags.APPLE_MFI_COPROCESSOR) + self.assertEqual(FeatureFlags[0x01], 'Supports HAP Pairing with Apple authentication coprocessor') - def test_bug_143(self): - # 0b10 -> 2 means no hap pairing support? - self.assertEqual(FeatureFlags[2], 'No support for HAP Pairing') + def test_support_hap_pairing_sw(self): + self.assertEqual(0x02, FeatureFlags.SOFTWARE_MFI_AUTH) + self.assertEqual(FeatureFlags[0x02], 'Supports HAP Pairing with Software authentication') -# def test_unknown_code(self): -# self.assertRaises(KeyError, FeatureFlags.__getitem__, 99) + def test_support_hap_pairing_hw_sw(self): + self.assertEqual(FeatureFlags[FeatureFlags.APPLE_MFI_COPROCESSOR | FeatureFlags.SOFTWARE_MFI_AUTH], + 'Supports HAP Pairing with Apple authentication coprocessor and Software authentication') + + def test_support_hap_pairing_unknown(self): + with self.assertRaises(KeyError): + FeatureFlags[0x80] diff --git a/tests/zeroconf_test.py b/tests/zeroconf_test.py index e6a20cdc..ca3b5dd3 100644 --- a/tests/zeroconf_test.py +++ b/tests/zeroconf_test.py @@ -18,7 +18,7 @@ from zeroconf import Zeroconf, ServiceInfo import socket -from homekit.zeroconf_impl import find_device_ip_and_port, discover_homekit_devices, get_from_properties +from homekit.zeroconf_impl import find_device_ip_port_props, discover_homekit_devices, get_from_properties class TestZeroconf(unittest.TestCase): @@ -39,7 +39,7 @@ def find_device(desc, result): return test_device def test_find_without_device(self): - result = find_device_ip_and_port('00:00:00:00:00:00', 1) + result = find_device_ip_port_props('00:00:00:00:00:00', 1) self.assertIsNone(result) def test_find_with_device(self): @@ -50,7 +50,7 @@ def test_find_with_device(self): zeroconf.unregister_all_services() zeroconf.register_service(info, allow_name_change=True) - result = find_device_ip_and_port('00:00:02:00:00:02', 10) + result = find_device_ip_port_props('00:00:02:00:00:02', 10) zeroconf.unregister_all_services()