Skip to content

Commit

Permalink
peer unit tests: Finish off unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sjlongland committed May 8, 2024
1 parent 76f1aae commit 3152c52
Show file tree
Hide file tree
Showing 2 changed files with 345 additions and 1 deletion.
7 changes: 6 additions & 1 deletion aioax25/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,10 +928,15 @@ def _on_negotiate_result(self, response, **kwargs):
self._process_xid_winszrx(AX25_20_DEFAULT_XID_WINDOWSZRX)
self._process_xid_acktimer(AX25_20_DEFAULT_XID_ACKTIMER)
self._process_xid_retrycounter(AX25_20_DEFAULT_XID_RETRIES)

# Downgrade 2.2 to 2.0, do not unwittingly "upgrade" 1.0 to 2.0!
if self._protocol in (AX25Version.UNKNOWN, AX25Version.AX25_22):
self._log.info("Downgrading to AX.25 2.0 due to failed XID")
self._protocol = AX25Version.AX25_20
self._modulo128 = False

# AX.25 2.2 is required for Mod128, so if we get FRMR here,
# disable this unconditionally.
self._modulo128 = False
elif self._protocol != AX25Version.AX25_22:
# Clearly this station understands AX.25 2.2
self._log.info("Upgrading to AX.25 2.2 due to successful XID")
Expand Down
339 changes: 339 additions & 0 deletions tests/test_peer/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from aioax25.peer import AX25PeerState
from .peer import TestingAX25Peer
from ..mocks import DummyStation, DummyTimeout
from functools import partial

from pytest import mark

# Connection establishment

Expand Down Expand Up @@ -98,6 +101,100 @@ def _negotiate(*args, **kwargs):
pass


def test_on_incoming_connect_timeout_incoming():
"""
Test if the application does not accept within the time-out, we reject the
connection.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
)

count = dict(reject=0)

def _reject():
count["reject"] += 1

peer.reject = _reject

peer._state = AX25PeerState.INCOMING_CONNECTION
peer._ack_timeout_handle = DummyTimeout(None, None)

peer._on_incoming_connect_timeout()

assert peer._ack_timeout_handle is None
assert count == dict(reject=1)


def test_on_incoming_connect_timeout_otherstate():
"""
Test if the incoming connection time-out fires whilst in another state, it
is ignored
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
)

count = dict(reject=0)

def _reject():
count["reject"] += 1

peer.reject = _reject

peer._state = AX25PeerState.CONNECTED
peer._ack_timeout_handle = DummyTimeout(None, None)

peer._on_incoming_connect_timeout()

assert peer._ack_timeout_handle is not None
assert count == dict(reject=0)


def test_on_connect_response_ack():
"""
Test if _on_connect_response receives an ACK, we enter the connected
state.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
)

peer._state = AX25PeerState.CONNECTING

peer._on_connect_response(response="ack")

assert peer._state == AX25PeerState.CONNECTED


def test_on_connect_response_other():
"""
Test if _on_connect_response receives another response, we enter the
disconnected state.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
)

peer._state = AX25PeerState.CONNECTING

peer._on_connect_response(response="nope")

assert peer._state == AX25PeerState.DISCONNECTED


# SABM(E) transmission


Expand Down Expand Up @@ -3123,3 +3220,245 @@ def test_stop_ack_timer_notexisting():
peer._ack_timeout_handle = None

peer._stop_ack_timer()


# AX.25 2.2 XID negotiation


@mark.parametrize("version", [AX25Version.AX25_10, AX25Version.AX25_20])
def test_negotiate_notsupported(version):
"""
Test the peer refuses to perform XID if the protocol does not support it.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

peer._state = AX25PeerState.CONNECTING
peer._protocol = version

try:
peer._negotiate(lambda **kwa: None)
assert False, "Should not have worked"
except RuntimeError as e:
assert str(e) == "%s does not support negotiation" % (version.value)


@mark.parametrize("version", [AX25Version.AX25_22, AX25Version.UNKNOWN])
def test_negotiate_supported(version):
"""
Test the peer refuses to perform XID if the protocol does not support it.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

# Stub XID transmission
count = dict(send_xid=0)

def _send_xid(cr):
count["send_xid"] += 1

peer._send_xid = _send_xid

peer._state = AX25PeerState.CONNECTING
peer._protocol = version

peer._negotiate(lambda **kwa: None)

# Check we actually did request a XID transmission
assert count == dict(send_xid=1)

# Trigger the DM callback to abort time-outs
assert peer._dmframe_handler is not None
peer._dmframe_handler()


@mark.parametrize(
"version, response",
[
(axver, res)
for axver in (AX25Version.AX25_22, AX25Version.UNKNOWN)
for res in ("frmr", "dm")
],
)
def test_on_negotiate_result_unsupported(version, response):
"""
Test we handle a response that indicates an AX.25 2.0 or earlier station.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

# Stub XID functions
xid_params = set()

def _set_xid_param(param, value):
xid_params.add(param)

for param in (
"cop",
"hdlcoptfunc",
"ifieldlenrx",
"winszrx",
"acktimer",
"retrycounter",
):
setattr(
peer, "_process_xid_%s" % param, partial(_set_xid_param, param)
)

assert peer._negotiated == False
peer._modulo128 = True
peer._protocol = version

peer._on_negotiate_result(response=response)

# Should downgrade to AX.25 2.0
assert peer._negotiated is True
assert peer._modulo128 is False
assert peer._protocol is AX25Version.AX25_20

# Should have inserted defaults for AX.25 2.0
assert xid_params == set(
[
"acktimer",
"cop",
"hdlcoptfunc",
"ifieldlenrx",
"retrycounter",
"winszrx",
]
)


@mark.parametrize(
"version, response",
[
(axver, res)
for axver in (AX25Version.AX25_20, AX25Version.AX25_10)
for res in ("frmr", "dm")
],
)
def test_on_negotiate_result_unsupported_old(version, response):
"""
Test we do not accidentally "upgrade" on FRMR/DM in response to XID.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

# Stub XID functions
xid_params = set()

def _set_xid_param(param, value):
xid_params.add(param)

for param in (
"cop",
"hdlcoptfunc",
"ifieldlenrx",
"winszrx",
"acktimer",
"retrycounter",
):
setattr(
peer, "_process_xid_%s" % param, partial(_set_xid_param, param)
)

assert peer._negotiated == False
peer._modulo128 = True
peer._protocol = version

peer._on_negotiate_result(response=response)

# Should leave this as is!
assert peer._protocol is version

# Should disable AX.25 2.2 features
assert peer._negotiated is True
assert peer._modulo128 is False

# Should have inserted defaults for AX.25 2.0
assert xid_params == set(
[
"acktimer",
"cop",
"hdlcoptfunc",
"ifieldlenrx",
"retrycounter",
"winszrx",
]
)


@mark.parametrize(
"version",
[
AX25Version.UNKNOWN,
AX25Version.AX25_22,
AX25Version.AX25_20,
AX25Version.AX25_10,
],
)
def test_on_negotiate_result_success(version):
"""
Test we upgrade to AX.25 2.2 if XID successful.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

# Stub XID functions
xid_params = set()

def _set_xid_param(param, value):
xid_params.add(param)

for param in (
"cop",
"hdlcoptfunc",
"ifieldlenrx",
"winszrx",
"acktimer",
"retrycounter",
):
setattr(
peer, "_process_xid_%s" % param, partial(_set_xid_param, param)
)

assert peer._negotiated == False
peer._modulo128 = True
peer._protocol = version

peer._on_negotiate_result(response="success")

# Should bump to AX.25 2.2
assert peer._protocol is AX25Version.AX25_22

# Should leave AX.25 2.2 features enabled
assert peer._negotiated is True
assert peer._modulo128 is True

# Should not override XID parameters set by handler
assert xid_params == set([])

0 comments on commit 3152c52

Please sign in to comment.