From 00e547ce62ba3071522fa313e49cf24cf61696b9 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Tue, 24 Oct 2023 20:19:11 +0800 Subject: [PATCH] Fix directed advertising --- avatar/cases/le_host_test.py | 50 +++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/avatar/cases/le_host_test.py b/avatar/cases/le_host_test.py index 8af96fa..f6b6e6c 100644 --- a/avatar/cases/le_host_test.py +++ b/avatar/cases/le_host_test.py @@ -23,6 +23,8 @@ from avatar import BumblePandoraDevice from avatar import PandoraDevice from avatar import PandoraDevices +from avatar import aio +from avatar import pandora_snippet from mobly import base_test from mobly import test_runner from mobly.asserts import assert_equal # type: ignore @@ -35,6 +37,7 @@ from pandora.host_pb2 import Connection from pandora.host_pb2 import DataTypes from pandora.host_pb2 import OwnAddressType +from pandora.security_pb2 import LE_LEVEL3 from typing import Any, Dict, Literal, Optional, Union @@ -67,6 +70,7 @@ def setup_class(self) -> None: for device in self.devices: if isinstance(device, BumblePandoraDevice): device.config.setdefault('classic_enabled', True) + device.config.setdefault('address_resolution_offload', True) def teardown_class(self) -> None: if self.devices: @@ -105,6 +109,45 @@ def test_scan( scan_response_data = DataTypes() if scannable == 'scannable' else None target = self.dut.address if directed == 'directed' else None + async def setup_pairing( + initiator: PandoraDevice, + acceptor: PandoraDevice, + initiator_addr_type: OwnAddressType, + acceptor_addr_type: OwnAddressType, + ) -> None: + # Acceptor - Advertise + advertisement = acceptor.aio.host.Advertise( + legacy=True, + connectable=True, + own_address_type=acceptor_addr_type, + data=DataTypes(manufacturer_specific_data=b'pause cafe'), + ) + + # Initiator - Scan and fetch the address + scan = initiator.aio.host.Scan(own_address_type=initiator_addr_type) + acceptor_scan = await anext( + (x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data) + ) # pytype: disable=name-error + scan.cancel() + + ini_cer, cer_ini = await pandora_snippet.connect_le( + initiator, advertisement, acceptor_scan, initiator_addr_type + ) + + await asyncio.gather( + initiator.aio.security.Secure(connection=ini_cer, le=LE_LEVEL3), + acceptor.aio.security.WaitSecurity(connection=cer_ini, le=LE_LEVEL3), + ) + + await asyncio.gather( + initiator.aio.host.Disconnect(connection=ini_cer), + acceptor.aio.host.WaitDisconnection(connection=cer_ini), + ) + + if directed == 'directed' and self.dut.name == 'android': + # Android cannot advertise with Public address, so devices must be paired and enable resolution to perform directed advertisement + aio.run_until_complete(setup_pairing(self.dut, self.ref, RANDOM, RANDOM)) + advertise = self.ref.host.Advertise( legacy=True, connectable=is_connectable, @@ -114,7 +157,12 @@ def test_scan( own_address_type=PUBLIC, ) - scan = self.dut.host.Scan(legacy=False, passive=False, timeout=self.scan_timeout) + scan = self.dut.host.Scan( + legacy=False, + passive=False, + own_address_type=RANDOM if self.dut.name == 'android' else PUBLIC, + timeout=self.scan_timeout, + ) report = next((x for x in scan if x.public == self.ref.address)) try: report = next((x for x in scan if x.public == self.ref.address))