Skip to content

Commit

Permalink
additional test
Browse files Browse the repository at this point in the history
  • Loading branch information
dmulcahey committed Nov 6, 2024
1 parent abafba8 commit 1adabf4
Showing 1 changed file with 36 additions and 9 deletions.
45 changes: 36 additions & 9 deletions tests/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ def _update_last_seen(*args, **kwargs): # pylint: disable=unused-argument
"zha.zigbee.cluster_handlers.general.BasicClusterHandler.async_initialize",
new=mock.AsyncMock(),
)
@pytest.mark.parametrize(
"zha_gateway",
[
"zha_gateway",
"ws_gateways",
],
indirect=True,
)
async def test_check_available_unsuccessful(
zha_gateway: Gateway,
) -> None:
Expand All @@ -257,59 +265,78 @@ async def test_check_available_unsuccessful(
zha_device = await join_zigpy_device(zha_gateway, device_with_basic_cluster_handler)
basic_ch = device_with_basic_cluster_handler.endpoints[3].basic

if hasattr(zha_gateway, "ws_gateway"):
server_device = zha_gateway.ws_gateway.devices[zha_device.ieee]
server_gateway = zha_gateway.ws_gateway
else:
server_device = zha_device
server_gateway = zha_gateway

assert zha_device.available is True
assert basic_ch.read_attributes.await_count == 0

device_with_basic_cluster_handler.last_seen = (
time.time() - zha_device.consider_unavailable_time - 2
time.time() - server_device.consider_unavailable_time - 2
)

for entity in zha_device.platform_entities.values():
for entity in server_device.platform_entities.values():
entity.emit = mock.MagicMock(wraps=entity.emit)

# we want to test the device availability handling alone
zha_gateway.global_updater.stop()
server_gateway.global_updater.stop()

# unsuccessfully ping zigpy device, but zha_device is still available
await _send_time_changed(
zha_gateway, zha_gateway._device_availability_checker.__polling_interval + 1
zha_gateway, server_gateway._device_availability_checker.__polling_interval + 1
)

assert basic_ch.read_attributes.await_count == 1
assert basic_ch.read_attributes.await_args[0][0] == ["manufacturer"]
assert zha_device.available is True

for entity in zha_device.platform_entities.values():
for entity in server_device.platform_entities.values():
entity.emit.assert_not_called()
assert entity.available
if server_device != zha_device:
assert zha_device.platform_entities[
(entity.PLATFORM, entity.unique_id)
].available
entity.emit.reset_mock()

# still no traffic, but zha_device is still available
await _send_time_changed(
zha_gateway, zha_gateway._device_availability_checker.__polling_interval + 1
zha_gateway, server_gateway._device_availability_checker.__polling_interval + 1
)

assert basic_ch.read_attributes.await_count == 2
assert basic_ch.read_attributes.await_args[0][0] == ["manufacturer"]
assert zha_device.available is True

for entity in zha_device.platform_entities.values():
for entity in server_device.platform_entities.values():
entity.emit.assert_not_called()
assert entity.available
if server_device != zha_device:
assert zha_device.platform_entities[
(entity.PLATFORM, entity.unique_id)
].available
entity.emit.reset_mock()

# not even trying to update, device is unavailable
await _send_time_changed(
zha_gateway, zha_gateway._device_availability_checker.__polling_interval + 1
zha_gateway, server_gateway._device_availability_checker.__polling_interval + 1
)

assert basic_ch.read_attributes.await_count == 2
assert basic_ch.read_attributes.await_args[0][0] == ["manufacturer"]
assert zha_device.available is False

for entity in zha_device.platform_entities.values():
for entity in server_device.platform_entities.values():
entity.emit.assert_called()
assert not entity.available
if server_device != zha_device:
assert not zha_device.platform_entities[
(entity.PLATFORM, entity.unique_id)
].available
entity.emit.reset_mock()


Expand Down

0 comments on commit 1adabf4

Please sign in to comment.