Skip to content

Commit

Permalink
Merge pull request #40 from sgfeniex/feature/add-enum-management-group
Browse files Browse the repository at this point in the history
feat: Enumeration Management Group
  • Loading branch information
JPHutchins authored Nov 26, 2024
2 parents 43ee844 + fcca5aa commit 0645aab
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 0 deletions.
3 changes: 3 additions & 0 deletions docs/enumeration_management.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Enumeration Management

::: smp.enumeration_management
1 change: 1 addition & 0 deletions mkdocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ nav:
- Settings Management: settings_management.md
- Shell Management: shell_management.md
- Statistics Management: statistics_management.md
- Enumeration Management: enumeration_management.md
- Zephyr Management: zephyr_management.md
- User Groups:
- Intercreate: user/intercreate.md
Expand Down
129 changes: 129 additions & 0 deletions smp/enumeration_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""The Simple Management Protocol (SMP) Enumeration Management group."""

from __future__ import annotations

from enum import IntEnum, unique
from typing import Tuple, Union

from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import Annotated

import smp.error as smperr
import smp.header as smphdr
import smp.message as smpmsg

# We can't 'or' types until a later python
GroupIdField = Annotated[
Union[smphdr.GroupId, smphdr.UserGroupId, int], Field(union_mode="left_to_right")
]


class GroupCountRequest(smpmsg.ReadRequest):
"""Read the number of SMP server groups."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_COUNT


class GroupCountResponse(smpmsg.ReadResponse):
"""SMP group count response."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_COUNT

count: int


class ListOfGroupsRequest(smpmsg.ReadRequest):
"""List the available SMP groups."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.LIST_OF_GROUPS


class ListOfGroupsResponse(smpmsg.ReadResponse):
"""SMP group list response."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.LIST_OF_GROUPS

groups: Tuple[GroupIdField, ...]


class GroupIdRequest(smpmsg.ReadRequest):
"""List a SMP group by index."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_ID

index: int | None = None


class GroupIdResponse(smpmsg.ReadResponse):
"""SMP group at index response."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_ID

group: GroupIdField
end: bool | None = None


class GroupDetailsRequest(smpmsg.ReadRequest):
"""List a SMP group by index."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_DETAILS

groups: Tuple[GroupIdField, ...]


class GroupDetails(BaseModel):
"""Group Details"""

model_config = ConfigDict(extra="forbid", frozen=True)

id: GroupIdField
name: str | None = None
handlers: int | None = None


class GroupDetailsResponse(smpmsg.ReadResponse):
"""SMP group details response."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
_COMMAND_ID = smphdr.CommandId.EnumManagement.GROUP_DETAILS

groups: Tuple[GroupDetails, ...]


@unique
class ENUM_MGMT_ERR(IntEnum):
"""Return codes for the enumeration management group."""

OK = 0
"""No error, this is implied if there is no ret value in the response."""

UNKNOWN = 1
"""Unknown error occurred."""

ERR_TOO_MANY_GROUP_ENTRIES = 2
"""Too many entries were provided."""

ERR_INSUFFICIENT_HEAP_FOR_ENTRIES = 3
"""Insufficient heap memory to store entry data."""

ENUM_MGMT_ERR_INDEX_TOO_LARGE = 4
"""Provided index is larger than the number of supported groups."""


class EnumManagementErrorV1(smperr.ErrorV1):
"""Error response to a enumeration management command."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT


class EnumManagementErrorV2(smperr.ErrorV2[ENUM_MGMT_ERR]):
"""Error response to a enumeration management command."""

_GROUP_ID = smphdr.GroupId.ENUM_MANAGEMENT
8 changes: 8 additions & 0 deletions smp/header.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ class FileManagement(IntEnum):
SUPPORTED_FILE_HASH_CHECKSUM_TYPES = 3
FILE_CLOSE = 4

@unique
class EnumManagement(IntEnum):
GROUP_COUNT = 0
LIST_OF_GROUPS = 1
GROUP_ID = 2
GROUP_DETAILS = 3

@unique
class ZephyrManagement(IntEnum):
ERASE_STORAGE = 0
Expand All @@ -80,6 +87,7 @@ class GroupId(IntEnum):
TEST_CRASH = 7
FILE_MANAGEMENT = 8
SHELL_MANAGEMENT = 9
ENUM_MANAGEMENT = 10
ZEPHYR_MANAGEMENT = 63


Expand Down
152 changes: 152 additions & 0 deletions tests/test_enumeration_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""Test the SMP Enumeration Management group."""

from __future__ import annotations

from typing import Any, Dict, Type, TypeVar

import cbor2
import pytest
from pydantic import BaseModel

from smp import enumeration_management as smpenum
from smp import header as smphdr
from smp import message as smpmsg
from tests.helpers import make_assert_header

T = TypeVar("T", bound=smpmsg._MessageBase)


def _do_test(
msg: Type[T],
op: smphdr.OP,
command_id: smphdr.CommandId.EnumManagement,
data: Dict[str, Any],
nested_model: Type[BaseModel] | None = None,
) -> T:
cbor = cbor2.dumps(data, canonical=True)
assert_header = make_assert_header(smphdr.GroupId.ENUM_MANAGEMENT, op, command_id, len(cbor))

def _assert_common(r: smpmsg._MessageBase) -> None:
assert_header(r)
for k, v in data.items():
if type(v) is tuple and nested_model is not None:
for v2 in v:
assert v2 == nested_model(**v2).model_dump()
else:
assert v == getattr(r, k)
assert cbor == r.BYTES[8:]

r = msg(**data)

_assert_common(r) # serialize
_assert_common(msg.loads(r.BYTES)) # deserialize

return r


def test_GroupCountRequest() -> None:
_do_test(
smpenum.GroupCountRequest,
smphdr.OP.READ,
smphdr.CommandId.EnumManagement.GROUP_COUNT,
{},
)


def test_GroupCountResponse() -> None:
r = _do_test(
smpenum.GroupCountResponse,
smphdr.OP.READ_RSP,
smphdr.CommandId.EnumManagement.GROUP_COUNT,
{"count": 2},
)
assert r.count == 2


def test_ListOfGroupsRequest() -> None:
_do_test(
smpenum.ListOfGroupsRequest,
smphdr.OP.READ,
smphdr.CommandId.EnumManagement.LIST_OF_GROUPS,
{},
)


def test_ListOfGroupsResponse() -> None:
r = _do_test(
smpenum.ListOfGroupsResponse,
smphdr.OP.READ_RSP,
smphdr.CommandId.EnumManagement.LIST_OF_GROUPS,
{"groups": (2, smphdr.GroupId.RUNTIME_TESTS, 15, 64)},
)
assert r.groups == (2, 5, 15, 64)

assert type(r.groups[0]) is smphdr.GroupId
assert r.groups[0] == smphdr.GroupId.STATISTICS_MANAGEMENT

assert type(r.groups[1]) is smphdr.GroupId
assert r.groups[1] == smphdr.GroupId.RUNTIME_TESTS

assert type(r.groups[2]) is int
assert r.groups[2] == 15

assert type(r.groups[3]) is smphdr.UserGroupId
assert r.groups[3] == smphdr.UserGroupId.INTERCREATE


@pytest.mark.parametrize("index", [0, 1, None])
def test_GroupIdRequest(index: int | None) -> None:
_do_test(
smpenum.GroupIdRequest,
smphdr.OP.READ,
smphdr.CommandId.EnumManagement.GROUP_ID,
{"index": index} if index is not None else {},
)


def test_GroupIdResponse() -> None:
r = _do_test(
smpenum.GroupIdResponse,
smphdr.OP.READ_RSP,
smphdr.CommandId.EnumManagement.GROUP_ID,
{"group": 2},
)
assert r.group == smphdr.GroupId.STATISTICS_MANAGEMENT
assert type(r.group) is smphdr.GroupId
assert not r.end


def test_GroupDetailsRequest() -> None:
_do_test(
smpenum.GroupDetailsRequest,
smphdr.OP.READ,
smphdr.CommandId.EnumManagement.GROUP_DETAILS,
{"groups": (smphdr.GroupId.STATISTICS_MANAGEMENT, smphdr.GroupId.RUNTIME_TESTS, 15)},
)


def test_GroupDetailsResponse() -> None:
r = _do_test(
smpenum.GroupDetailsResponse,
smphdr.OP.READ_RSP,
smphdr.CommandId.EnumManagement.GROUP_DETAILS,
{
"groups": (
{"id": 2, "name": "group2", "handlers": 2},
{"id": 5, "name": "group5", "handlers": 5},
{"id": 15, "name": "group15", "handlers": 15},
{"id": 64, "name": "group64", "handlers": 64},
)
},
nested_model=smpenum.GroupDetails,
)
assert r.groups == (
smpenum.GroupDetails(id=2, name="group2", handlers=2),
smpenum.GroupDetails(id=5, name="group5", handlers=5),
smpenum.GroupDetails(id=15, name="group15", handlers=15),
smpenum.GroupDetails(id=64, name="group64", handlers=64),
)
assert type(r.groups[0].id) is smphdr.GroupId
assert type(r.groups[1].id) is smphdr.GroupId
assert type(r.groups[2].id) is int
assert type(r.groups[3].id) is smphdr.UserGroupId

0 comments on commit 0645aab

Please sign in to comment.