From 6aa797253c872114917c18418f87d8966f257095 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Tue, 15 Aug 2023 15:15:10 +0800 Subject: [PATCH] L2CAP cases --- avatar/cases/l2cap_test.py | 223 ++++++++++++++++++++++ avatar/cases/le_security_test.py | 6 +- avatar/cases/security_test.py | 10 +- avatar/common.py | 64 +++++++ avatar/pandora_client.py | 12 ++ avatar/{pandora.py => pandora_snippet.py} | 0 6 files changed, 307 insertions(+), 8 deletions(-) create mode 100644 avatar/cases/l2cap_test.py create mode 100644 avatar/common.py rename avatar/{pandora.py => pandora_snippet.py} (100%) diff --git a/avatar/cases/l2cap_test.py b/avatar/cases/l2cap_test.py new file mode 100644 index 0000000..75f9ef9 --- /dev/null +++ b/avatar/cases/l2cap_test.py @@ -0,0 +1,223 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import avatar +import logging + +from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices +from avatar.common import make_bredr_connection, make_le_connection +from mobly import base_test, test_runner +from mobly.asserts import assert_equal # type: ignore +from mobly.asserts import assert_is_not_none # type: ignore +from pandora import host_pb2 +from pandora import l2cap_pb2 +from typing import Any, Dict, Literal, Optional, Union, Callable, Tuple, Awaitable + +CONNECTORS: Dict[ + str, + Callable[[avatar.PandoraDevice, avatar.PandoraDevice], Awaitable[Tuple[host_pb2.Connection, host_pb2.Connection]]], +] = { + 'Classic': make_bredr_connection, + 'LE': make_le_connection, +} + +FIXED_CHANNEL_CID = 0x3E +CLASSIC_PSM = 0xFEFF +LE_SPSM = 0xF0 + + +class L2capTest(base_test.BaseTestClass): # type: ignore[misc] + devices: Optional[PandoraDevices] = None + + # pandora devices. + dut: PandoraDevice + ref: PandoraDevice + + def setup_class(self) -> None: + self.devices = PandoraDevices(self) + self.dut, self.ref, *_ = self.devices + + # Enable BR/EDR mode for Bumble devices. + for device in self.devices: + if isinstance(device, BumblePandoraDevice): + device.config.setdefault("classic_enabled", True) + + def teardown_class(self) -> None: + if self.devices: + self.devices.stop_all() + + @avatar.asynchronous + async def setup_test(self) -> None: # pytype: disable=wrong-arg-types + await asyncio.gather(self.dut.reset(), self.ref.reset()) + + @avatar.parameterized( + ('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), + ('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), + ('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))), + ( + 'LE', + dict( + le_credit_based=l2cap_pb2.CreditBasedChannelRequest( + spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256 + ) + ), + ), + ) # type: ignore[misc] + @avatar.asynchronous + async def test_connect( + self, + transport: Union[Literal['Classic'], Literal['LE']], + request: Dict[str, Any], + ) -> None: + dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref) + server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request) + ref_dut_res, dut_ref_res = await asyncio.gather( + anext(aiter(server)), + self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request), + ) + assert_is_not_none(ref_dut_res.channel) + assert_is_not_none(dut_ref_res.channel) + + @avatar.parameterized( + ('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), + ('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), + ('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))), + ( + 'LE', + dict( + le_credit_based=l2cap_pb2.CreditBasedChannelRequest( + spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256 + ) + ), + ), + ) # type: ignore[misc] + @avatar.asynchronous + async def test_on_connection( + self, + transport: Union[Literal['Classic'], Literal['LE']], + request: Dict[str, Any], + ) -> None: + dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref) + server = self.dut.aio.l2cap.OnConnection(connection=dut_ref_acl, **request) + ref_dut_res, dut_ref_res = await asyncio.gather( + self.ref.aio.l2cap.Connect(connection=ref_dut_acl, **request), + anext(aiter(server)), + ) + assert_is_not_none(ref_dut_res.channel) + assert_is_not_none(dut_ref_res.channel) + + @avatar.parameterized( + ('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))), + ( + 'LE', + dict( + le_credit_based=l2cap_pb2.CreditBasedChannelRequest( + spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256 + ) + ), + ), + ) # type: ignore[misc] + @avatar.asynchronous + async def test_disconnect( + self, + transport: Union[Literal['Classic'], Literal['LE']], + request: Dict[str, Any], + ) -> None: + dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref) + server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request) + ref_dut_res, dut_ref_res = await asyncio.gather( + anext(aiter(server)), + self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request), + ) + assert dut_ref_res.channel and ref_dut_res.channel + + await asyncio.gather( + self.dut.aio.l2cap.Disconnect(channel=dut_ref_res.channel), + self.ref.aio.l2cap.WaitDisconnection(channel=ref_dut_res.channel), + ) + + @avatar.parameterized( + ('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))), + ( + 'LE', + dict( + le_credit_based=l2cap_pb2.CreditBasedChannelRequest( + spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256 + ) + ), + ), + ) # type: ignore[misc] + @avatar.asynchronous + async def test_wait_disconnection( + self, + transport: Union[Literal['Classic'], Literal['LE']], + request: Dict[str, Any], + ) -> None: + dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref) + server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request) + ref_dut_res, dut_ref_res = await asyncio.gather( + anext(aiter(server)), + self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request), + ) + assert dut_ref_res.channel and ref_dut_res.channel + + await asyncio.gather( + self.ref.aio.l2cap.Disconnect(channel=ref_dut_res.channel), + self.dut.aio.l2cap.WaitDisconnection(channel=dut_ref_res.channel), + ) + + @avatar.parameterized( + ('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), + ('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), + ('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))), + ( + 'LE', + dict( + le_credit_based=l2cap_pb2.CreditBasedChannelRequest( + spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256 + ) + ), + ), + ) # type: ignore[misc] + @avatar.asynchronous + async def test_send( + self, + transport: Union[Literal['Classic'], Literal['LE']], + request: Dict[str, Any], + ) -> None: + dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref) + server = self.dut.aio.l2cap.OnConnection(connection=dut_ref_acl, **request) + ref_dut_res, dut_ref_res = await asyncio.gather( + self.ref.aio.l2cap.Connect(connection=ref_dut_acl, **request), + anext(aiter(server)), + ) + ref_dut_channel = ref_dut_res.channel + dut_ref_channel = dut_ref_res.channel + assert_is_not_none(ref_dut_res.channel) + assert_is_not_none(dut_ref_res.channel) + assert ref_dut_channel and dut_ref_channel + + dut_ref_stream = self.ref.aio.l2cap.Receive(channel=dut_ref_channel) + _send_res, recv_res = await asyncio.gather( + self.dut.aio.l2cap.Send(channel=ref_dut_channel, data=b"The quick brown fox jumps over the lazy dog"), + anext(aiter(dut_ref_stream)), + ) + assert recv_res.data + assert_equal(recv_res.data, b"The quick brown fox jumps over the lazy dog") + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + test_runner.main() # type: ignore diff --git a/avatar/cases/le_security_test.py b/avatar/cases/le_security_test.py index b91d8c7..aa2b6d6 100644 --- a/avatar/cases/le_security_test.py +++ b/avatar/cases/le_security_test.py @@ -20,7 +20,7 @@ from avatar import BumblePandoraDevice from avatar import PandoraDevice from avatar import PandoraDevices -from avatar import pandora +from avatar import pandora_snippet from bumble.pairing import PairingConfig from bumble.pairing import PairingDelegate from mobly import base_test @@ -226,7 +226,7 @@ async def connect_le( scan.cancel() # Initiator - LE connect - return await pandora.connect_le(initiator, advertisement, acceptor_scan, initiator_addr_type) + return await pandora_snippet.connect_le(initiator, advertisement, acceptor_scan, initiator_addr_type) # Make LE connection. if connect == 'incoming_connection': @@ -376,7 +376,7 @@ def on_done(_: Any) -> None: assert ref_dut_classic_res.connection ref_dut_classic = ref_dut_classic_res.connection else: - ref_dut_classic, _ = await pandora.connect(self.ref, self.dut) + ref_dut_classic, _ = await pandora_snippet.connect(self.ref, self.dut) # Try to encrypt Classic connection ref_dut_secure = await self.ref.aio.security.Secure(ref_dut_classic, classic=LEVEL2) assert_equal(ref_dut_secure.result_variant(), 'success') diff --git a/avatar/cases/security_test.py b/avatar/cases/security_test.py index 5213119..235e1e9 100644 --- a/avatar/cases/security_test.py +++ b/avatar/cases/security_test.py @@ -20,7 +20,7 @@ from avatar import BumblePandoraDevice from avatar import PandoraDevice from avatar import PandoraDevices -from avatar import pandora +from avatar import pandora_snippet from bumble.hci import HCI_CENTRAL_ROLE from bumble.hci import HCI_PERIPHERAL_ROLE from bumble.hci import HCI_Write_Default_Link_Policy_Settings_Command @@ -249,16 +249,16 @@ async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: # Make classic connection. if connect == 'incoming_connection': - ref_dut, dut_ref = await pandora.connect(initiator=self.ref, acceptor=self.dut) + ref_dut, dut_ref = await pandora_snippet.connect(initiator=self.ref, acceptor=self.dut) else: - dut_ref, ref_dut = await pandora.connect(initiator=self.dut, acceptor=self.ref) + dut_ref, ref_dut = await pandora_snippet.connect(initiator=self.dut, acceptor=self.ref) # Retrieve Bumble connection if isinstance(self.dut, BumblePandoraDevice): - dut_ref_bumble = pandora.get_raw_connection(self.dut, dut_ref) + dut_ref_bumble = pandora_snippet.get_raw_connection(self.dut, dut_ref) # Role switch. if isinstance(self.ref, BumblePandoraDevice): - ref_dut_bumble = pandora.get_raw_connection(self.ref, ref_dut) + ref_dut_bumble = pandora_snippet.get_raw_connection(self.ref, ref_dut) if ref_dut_bumble is not None: role = { 'against_central': HCI_CENTRAL_ROLE, diff --git a/avatar/common.py b/avatar/common.py new file mode 100644 index 0000000..8d97d74 --- /dev/null +++ b/avatar/common.py @@ -0,0 +1,64 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio + +from avatar import PandoraDevice +from mobly.asserts import assert_equal # type: ignore +from mobly.asserts import assert_is_not_none # type: ignore +from pandora.host_pb2 import RANDOM, Connection, DataTypes, OwnAddressType +from typing import Tuple + + +# Make classic connection task. +async def make_bredr_connection(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]: + init_res, wait_res = await asyncio.gather( + initiator.aio.host.Connect(address=acceptor.address), + acceptor.aio.host.WaitConnection(address=initiator.address), + ) + assert_equal(init_res.result_variant(), 'connection') + assert_equal(wait_res.result_variant(), 'connection') + assert init_res.connection is not None and wait_res.connection is not None + return init_res.connection, wait_res.connection + + +# Make LE connection task. +async def make_le_connection( + central: PandoraDevice, + peripheral: PandoraDevice, + central_address_type: OwnAddressType = RANDOM, + peripheral_address_type: OwnAddressType = RANDOM, +) -> Tuple[Connection, Connection]: + advertise = peripheral.aio.host.Advertise( + legacy=True, + connectable=True, + own_address_type=peripheral_address_type, + data=DataTypes(manufacturer_specific_data=b'pause cafe'), + ) + + scan = central.aio.host.Scan(own_address_type=central_address_type) + ref = await anext((x async for x in scan if x.data.manufacturer_specific_data == b'pause cafe')) + scan.cancel() + + adv_res, conn_res = await asyncio.gather( + anext(aiter(advertise)), + central.aio.host.ConnectLE(**ref.address_asdict(), own_address_type=central_address_type), + ) + assert_equal(conn_res.result_variant(), 'connection') + cen_per, per_cen = conn_res.connection, adv_res.connection + assert_is_not_none(cen_per) + assert_is_not_none(per_cen) + assert cen_per, per_cen + advertise.cancel() + return cen_per, per_cen diff --git a/avatar/pandora_client.py b/avatar/pandora_client.py index 98211c6..fd4eb33 100644 --- a/avatar/pandora_client.py +++ b/avatar/pandora_client.py @@ -33,6 +33,8 @@ from pandora import host_grpc_aio from pandora import security_grpc from pandora import security_grpc_aio +from pandora import l2cap_grpc +from pandora import l2cap_grpc_aio from typing import Any, Dict, MutableMapping, Optional, Tuple, Union @@ -152,6 +154,11 @@ def security_storage(self) -> security_grpc.SecurityStorage: """Returns the Pandora SecurityStorage gRPC interface.""" return security_grpc.SecurityStorage(self.channel) + @property + def l2cap(self) -> l2cap_grpc.L2CAP: + """Returns the Pandora SecurityStorage gRPC interface.""" + return l2cap_grpc.L2CAP(self.channel) + @dataclass class Aio: channel: grpc.aio.Channel @@ -171,6 +178,11 @@ def security_storage(self) -> security_grpc_aio.SecurityStorage: """Returns the Pandora SecurityStorage gRPC interface.""" return security_grpc_aio.SecurityStorage(self.channel) + @property + def l2cap(self) -> l2cap_grpc_aio.L2CAP: + """Returns the Pandora SecurityStorage gRPC interface.""" + return l2cap_grpc_aio.L2CAP(self.channel) + @property def aio(self) -> 'PandoraClient.Aio': if not self._aio: diff --git a/avatar/pandora.py b/avatar/pandora_snippet.py similarity index 100% rename from avatar/pandora.py rename to avatar/pandora_snippet.py