Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

L2CAP cases #57

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions avatar/cases/l2cap_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import avatar
import logging

from avatar import BumblePandoraDevice
from avatar import PandoraDevice
from avatar import PandoraDevices
from avatar.common import make_bredr_connection
from avatar.common import make_le_connection
from mobly import base_test
from mobly import test_runner
from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_is_not_none # type: ignore
from pandora import host_pb2
from pandora import l2cap_pb2
from typing import Any, Awaitable, Callable, Dict, Literal, Optional, Tuple, Union

CONNECTORS: Dict[
str,
Callable[[avatar.PandoraDevice, avatar.PandoraDevice], Awaitable[Tuple[host_pb2.Connection, host_pb2.Connection]]],
] = {
'Classic': make_bredr_connection,
'LE': make_le_connection,
}

FIXED_CHANNEL_CID = 0x3E
CLASSIC_PSM = 0xFEFF
LE_SPSM = 0xF0


class L2capTest(base_test.BaseTestClass): # type: ignore[misc]
devices: Optional[PandoraDevices] = None

# pandora devices.
dut: PandoraDevice
ref: PandoraDevice

def setup_class(self) -> None:
self.devices = PandoraDevices(self)
self.dut, self.ref, *_ = self.devices

# Enable BR/EDR mode for Bumble devices.
for device in self.devices:
if isinstance(device, BumblePandoraDevice):
device.config.setdefault("classic_enabled", True)

def teardown_class(self) -> None:
if self.devices:
self.devices.stop_all()

@avatar.asynchronous
async def setup_test(self) -> None: # pytype: disable=wrong-arg-types
await asyncio.gather(self.dut.reset(), self.ref.reset())

@avatar.parameterized(
('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))),
(
'LE',
dict(
le_credit_based=l2cap_pb2.CreditBasedChannelRequest(
spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256
)
),
),
) # type: ignore[misc]
@avatar.asynchronous
async def test_connect(
self,
transport: Union[Literal['Classic'], Literal['LE']],
request: Dict[str, Any],
) -> None:
dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref)
server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request)
ref_dut_res, dut_ref_res = await asyncio.gather(
anext(aiter(server)),
self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request),
)
assert_is_not_none(ref_dut_res.channel)
assert_is_not_none(dut_ref_res.channel)

@avatar.parameterized(
('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))),
(
'LE',
dict(
le_credit_based=l2cap_pb2.CreditBasedChannelRequest(
spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256
)
),
),
) # type: ignore[misc]
@avatar.asynchronous
async def test_on_connection(
self,
transport: Union[Literal['Classic'], Literal['LE']],
request: Dict[str, Any],
) -> None:
dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref)
server = self.dut.aio.l2cap.OnConnection(connection=dut_ref_acl, **request)
ref_dut_res, dut_ref_res = await asyncio.gather(
self.ref.aio.l2cap.Connect(connection=ref_dut_acl, **request),
anext(aiter(server)),
)
assert_is_not_none(ref_dut_res.channel)
assert_is_not_none(dut_ref_res.channel)

@avatar.parameterized(
('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))),
(
'LE',
dict(
le_credit_based=l2cap_pb2.CreditBasedChannelRequest(
spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256
)
),
),
) # type: ignore[misc]
@avatar.asynchronous
async def test_disconnect(
self,
transport: Union[Literal['Classic'], Literal['LE']],
request: Dict[str, Any],
) -> None:
dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref)
server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request)
ref_dut_res, dut_ref_res = await asyncio.gather(
anext(aiter(server)),
self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request),
)
assert dut_ref_res.channel and ref_dut_res.channel

await asyncio.gather(
self.dut.aio.l2cap.Disconnect(channel=dut_ref_res.channel),
self.ref.aio.l2cap.WaitDisconnection(channel=ref_dut_res.channel),
)

@avatar.parameterized(
('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))),
(
'LE',
dict(
le_credit_based=l2cap_pb2.CreditBasedChannelRequest(
spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256
)
),
),
) # type: ignore[misc]
@avatar.asynchronous
async def test_wait_disconnection(
self,
transport: Union[Literal['Classic'], Literal['LE']],
request: Dict[str, Any],
) -> None:
dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref)
server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request)
ref_dut_res, dut_ref_res = await asyncio.gather(
anext(aiter(server)),
self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request),
)
assert dut_ref_res.channel and ref_dut_res.channel

await asyncio.gather(
self.ref.aio.l2cap.Disconnect(channel=ref_dut_res.channel),
self.dut.aio.l2cap.WaitDisconnection(channel=dut_ref_res.channel),
)

@avatar.parameterized(
('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))),
(
'LE',
dict(
le_credit_based=l2cap_pb2.CreditBasedChannelRequest(
spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256
)
),
),
) # type: ignore[misc]
@avatar.asynchronous
async def test_send(
self,
transport: Union[Literal['Classic'], Literal['LE']],
request: Dict[str, Any],
) -> None:
dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref)
server = self.dut.aio.l2cap.OnConnection(connection=dut_ref_acl, **request)
ref_dut_res, dut_ref_res = await asyncio.gather(
self.ref.aio.l2cap.Connect(connection=ref_dut_acl, **request),
anext(aiter(server)),
)
ref_dut_channel = ref_dut_res.channel
dut_ref_channel = dut_ref_res.channel
assert_is_not_none(ref_dut_res.channel)
assert_is_not_none(dut_ref_res.channel)
assert ref_dut_channel and dut_ref_channel

dut_ref_stream = self.ref.aio.l2cap.Receive(channel=dut_ref_channel)
_send_res, recv_res = await asyncio.gather(
self.dut.aio.l2cap.Send(channel=ref_dut_channel, data=b"The quick brown fox jumps over the lazy dog"),
anext(aiter(dut_ref_stream)),
)
assert recv_res.data
assert_equal(recv_res.data, b"The quick brown fox jumps over the lazy dog")


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
test_runner.main() # type: ignore
6 changes: 3 additions & 3 deletions avatar/cases/le_security_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from avatar import BumblePandoraDevice
from avatar import PandoraDevice
from avatar import PandoraDevices
from avatar import pandora
from avatar import pandora_snippet
from bumble.pairing import PairingConfig
from bumble.pairing import PairingDelegate
from mobly import base_test
Expand Down Expand Up @@ -226,7 +226,7 @@ async def connect_le(
scan.cancel()

# Initiator - LE connect
return await pandora.connect_le(initiator, advertisement, acceptor_scan, initiator_addr_type)
return await pandora_snippet.connect_le(initiator, advertisement, acceptor_scan, initiator_addr_type)

# Make LE connection.
if connect == 'incoming_connection':
Expand Down Expand Up @@ -376,7 +376,7 @@ def on_done(_: Any) -> None:
assert ref_dut_classic_res.connection
ref_dut_classic = ref_dut_classic_res.connection
else:
ref_dut_classic, _ = await pandora.connect(self.ref, self.dut)
ref_dut_classic, _ = await pandora_snippet.connect(self.ref, self.dut)
# Try to encrypt Classic connection
ref_dut_secure = await self.ref.aio.security.Secure(ref_dut_classic, classic=LEVEL2)
assert_equal(ref_dut_secure.result_variant(), 'success')
Expand Down
10 changes: 5 additions & 5 deletions avatar/cases/security_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from avatar import BumblePandoraDevice
from avatar import PandoraDevice
from avatar import PandoraDevices
from avatar import pandora
from avatar import pandora_snippet
from bumble.hci import HCI_CENTRAL_ROLE
from bumble.hci import HCI_PERIPHERAL_ROLE
from bumble.hci import HCI_Write_Default_Link_Policy_Settings_Command
Expand Down Expand Up @@ -249,16 +249,16 @@ async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:

# Make classic connection.
if connect == 'incoming_connection':
ref_dut, dut_ref = await pandora.connect(initiator=self.ref, acceptor=self.dut)
ref_dut, dut_ref = await pandora_snippet.connect(initiator=self.ref, acceptor=self.dut)
else:
dut_ref, ref_dut = await pandora.connect(initiator=self.dut, acceptor=self.ref)
dut_ref, ref_dut = await pandora_snippet.connect(initiator=self.dut, acceptor=self.ref)

# Retrieve Bumble connection
if isinstance(self.dut, BumblePandoraDevice):
dut_ref_bumble = pandora.get_raw_connection(self.dut, dut_ref)
dut_ref_bumble = pandora_snippet.get_raw_connection(self.dut, dut_ref)
# Role switch.
if isinstance(self.ref, BumblePandoraDevice):
ref_dut_bumble = pandora.get_raw_connection(self.ref, ref_dut)
ref_dut_bumble = pandora_snippet.get_raw_connection(self.ref, ref_dut)
if ref_dut_bumble is not None:
role = {
'against_central': HCI_CENTRAL_ROLE,
Expand Down
67 changes: 67 additions & 0 deletions avatar/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2023 Google LLC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove ?

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio

from avatar import PandoraDevice
from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_is_not_none # type: ignore
from pandora.host_pb2 import RANDOM
from pandora.host_pb2 import Connection
from pandora.host_pb2 import DataTypes
from pandora.host_pb2 import OwnAddressType
from typing import Tuple


# Make classic connection task.
async def make_bredr_connection(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]:
init_res, wait_res = await asyncio.gather(
initiator.aio.host.Connect(address=acceptor.address),
acceptor.aio.host.WaitConnection(address=initiator.address),
)
assert_equal(init_res.result_variant(), 'connection')
assert_equal(wait_res.result_variant(), 'connection')
assert init_res.connection is not None and wait_res.connection is not None
return init_res.connection, wait_res.connection


# Make LE connection task.
async def make_le_connection(
central: PandoraDevice,
peripheral: PandoraDevice,
central_address_type: OwnAddressType = RANDOM,
peripheral_address_type: OwnAddressType = RANDOM,
) -> Tuple[Connection, Connection]:
advertise = peripheral.aio.host.Advertise(
legacy=True,
connectable=True,
own_address_type=peripheral_address_type,
data=DataTypes(manufacturer_specific_data=b'pause cafe'),
)

scan = central.aio.host.Scan(own_address_type=central_address_type)
ref = await anext((x async for x in scan if x.data.manufacturer_specific_data == b'pause cafe'))
scan.cancel()

adv_res, conn_res = await asyncio.gather(
anext(aiter(advertise)),
central.aio.host.ConnectLE(**ref.address_asdict(), own_address_type=central_address_type),
)
assert_equal(conn_res.result_variant(), 'connection')
cen_per, per_cen = conn_res.connection, adv_res.connection
assert_is_not_none(cen_per)
assert_is_not_none(per_cen)
assert cen_per, per_cen
advertise.cancel()
return cen_per, per_cen
12 changes: 12 additions & 0 deletions avatar/pandora_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from dataclasses import dataclass
from pandora import host_grpc
from pandora import host_grpc_aio
from pandora import l2cap_grpc
from pandora import l2cap_grpc_aio
from pandora import security_grpc
from pandora import security_grpc_aio
from typing import Any, Dict, MutableMapping, Optional, Tuple, Union
Expand Down Expand Up @@ -152,6 +154,11 @@ def security_storage(self) -> security_grpc.SecurityStorage:
"""Returns the Pandora SecurityStorage gRPC interface."""
return security_grpc.SecurityStorage(self.channel)

@property
def l2cap(self) -> l2cap_grpc.L2CAP:
"""Returns the Pandora SecurityStorage gRPC interface."""
return l2cap_grpc.L2CAP(self.channel)

@dataclass
class Aio:
channel: grpc.aio.Channel
Expand All @@ -171,6 +178,11 @@ def security_storage(self) -> security_grpc_aio.SecurityStorage:
"""Returns the Pandora SecurityStorage gRPC interface."""
return security_grpc_aio.SecurityStorage(self.channel)

@property
def l2cap(self) -> l2cap_grpc_aio.L2CAP:
"""Returns the Pandora SecurityStorage gRPC interface."""
return l2cap_grpc_aio.L2CAP(self.channel)

@property
def aio(self) -> 'PandoraClient.Aio':
if not self._aio:
Expand Down
File renamed without changes.