Skip to content

Commit

Permalink
(fix) Fixes type hints for python3.8 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
mkabtoul committed Feb 5, 2025
1 parent 17355c3 commit 24018b1
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 31 deletions.
10 changes: 5 additions & 5 deletions ros_sugar/config/base_attrs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from types import NoneType, GenericAlias
# from types import NoneType, GenericAlias
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -82,7 +82,7 @@ def __is_valid_arg_of_union_type(cls, obj, union_types) -> bool:
:rtype: _type_
"""
_types = [
get_origin(t) if isinstance(t, (GenericAlias, _GenericAlias)) else t
get_origin(t) if isinstance(t, _GenericAlias) else t
for t in get_args(union_types)
]
return any(isinstance(obj, t) for t in _types)
Expand Down Expand Up @@ -127,7 +127,7 @@ def __check_value_against_attr_type(
# Handles only the origin of GenericAlias (dict, list)
_attribute_type = (
get_origin(attribute_type)
if isinstance(attribute_type, (GenericAlias, _GenericAlias))
if isinstance(attribute_type, _GenericAlias)
else attribute_type
)
if not isinstance(value, _attribute_type):
Expand Down Expand Up @@ -200,7 +200,7 @@ def from_dict(self, dict_obj: Dict) -> None:
def from_yaml(
self,
file_path: str,
nested_root_name: str | None = None,
nested_root_name: Union[str, None] = None,
get_common: bool = False,
) -> None:
"""
Expand Down Expand Up @@ -312,7 +312,7 @@ def __dict_to_serialized_dict(self, dictionary):
dictionary[name] = tuple(self.__list_to_serialized_list(list(value)))
elif isinstance(value, Dict):
dictionary[name] = self.__dict_to_serialized_dict(value)
elif type(value) not in [float, int, str, bool, NoneType]:
elif type(value) not in [float, int, str, bool]:
dictionary[name] = str(value)
return dictionary

Expand Down
2 changes: 1 addition & 1 deletion ros_sugar/core/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ def got_all_inputs(
return False
return True

def get_missing_inputs(self) -> list[str]:
def get_missing_inputs(self) -> List[str]:
"""
Get a list of input topic names not being published
Expand Down
17 changes: 8 additions & 9 deletions ros_sugar/io/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

import inspect
from types import ModuleType
from typing import Any, List, Optional, Union, Dict
from typing import Any, List, Optional, Union, Dict, Type
from attrs import Factory, define, field
from ..config import BaseAttrs, QoSConfig, base_validators

from . import supported_types


def get_all_msg_types(
msg_types_module: ModuleType = supported_types,
) -> List[type[supported_types.SupportedType]]:
) -> List[Type[supported_types.SupportedType]]:
"""
Gets all message types from supported data_types
:return: Supported data types
Expand Down Expand Up @@ -49,9 +48,9 @@ def __parse_name_without_class(type_name: str) -> str:


def get_msg_type(
type_name: Union[type[supported_types.SupportedType], str],
type_name: Union[Type[supported_types.SupportedType], str],
msg_types_module: Optional[ModuleType] = supported_types,
) -> Union[type[supported_types.SupportedType], str]:
) -> Union[Type[supported_types.SupportedType], str]:
"""
Gets a message type from supported data_types given a string name
:param type_name: Message name
Expand Down Expand Up @@ -85,8 +84,8 @@ def get_msg_type(


def _get_msg_types(
type_names: List[Union[type[supported_types.SupportedType], str]],
) -> List[Union[type[supported_types.SupportedType], str]]:
type_names: List[Union[Type[supported_types.SupportedType], str]],
) -> List[Union[Type[supported_types.SupportedType], str]]:
"""
Gets a list of message types from supported data_types given a list of string names
:param type_name: List of message names
Expand Down Expand Up @@ -138,7 +137,7 @@ class Topic(BaseAttrs):
"""

name: str = field(converter=_normalize_topic_name)
msg_type: Union[type[supported_types.SupportedType], str] = field(
msg_type: Union[Type[supported_types.SupportedType], str] = field(
converter=get_msg_type
)
qos_profile: Union[Dict, QoSConfig] = field(
Expand All @@ -161,7 +160,7 @@ def _msg_type_validator(self, _, val):
class AllowedTopics(BaseAttrs):
"""Configure allowed types to restrict a component Topic"""

types: List[Union[type[supported_types.SupportedType], str]] = field(
types: List[Union[Type[supported_types.SupportedType], str]] = field(
converter=_get_msg_types
)
number_required: int = field(
Expand Down
15 changes: 7 additions & 8 deletions ros_sugar/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,15 @@ def _parse_array_type(arr: np.ndarray, ros_msg_cls: type) -> np.ndarray:
:return: Parsed data
:rtype: np.ndarray
"""
match ros_msg_cls:
case std_msg.Float32MultiArray:
arr = arr.astype(np.float32)
case std_msg.Float64MultiArray:
arr = arr.astype(np.float64)
case std_msg.Int16MultiArray:
if ros_msg_cls==std_msg.Float32MultiArray:
arr = arr.astype(np.float32)
elif ros_msg_cls==std_msg.Float64MultiArray:
arr = arr.astype(np.float64)
elif ros_msg_cls==std_msg.Int16MultiArray:
arr = arr.astype(np.int16)
case std_msg.Int32MultiArray:
elif ros_msg_cls==std_msg.Int32MultiArray:
arr = arr.astype(np.int32)
case std_msg.Int64MultiArray:
elif ros_msg_cls==std_msg.Int64MultiArray:
arr = arr.astype(np.int64)
return arr

Expand Down
6 changes: 3 additions & 3 deletions ros_sugar/launch/executable.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import argparse
import logging
from typing import Optional, List, Type
from typing import Optional, List, Type, Tuple

import rclpy
import setproctitle
from rclpy.executors import MultiThreadedExecutor
from rclpy.utilities import try_shutdown


def _parse_args() -> tuple[argparse.Namespace, List[str]]:
def _parse_args() -> Tuple[argparse.Namespace, List[str]]:
"""Parse arguments."""
parser = argparse.ArgumentParser(description="Component Executable Config")
parser.add_argument(
Expand Down Expand Up @@ -89,7 +89,7 @@ def _parse_component_config(
return config


def _parse_ros_args(args_names: list[str]) -> list[str]:
def _parse_ros_args(args_names: List[str]) -> List[str]:
"""Parse ROS arguments from command line arguments
:param args_names: List of all parsed arguments
Expand Down
4 changes: 2 additions & 2 deletions ros_sugar/launch/launcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Launcher"""

from __future__ import annotations
import os
import inspect
import sys
Expand Down Expand Up @@ -82,7 +82,7 @@ class Launcher:
def __init__(
self,
namespace: str = "",
config_file: str | None = None,
config_file: Optional[str] = None,
enable_monitoring: bool = True,
activation_timeout: Optional[float] = None,
) -> None:
Expand Down
6 changes: 3 additions & 3 deletions ros_sugar/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import inspect
from enum import IntEnum as BaseIntEnum
from functools import wraps
from typing import Callable, List
from typing import Callable, List, Union

from rclpy.utilities import ok as rclpy_is_ok
from rclpy.lifecycle import Node as LifecycleNode
Expand Down Expand Up @@ -37,7 +37,7 @@ class IntEnum(BaseIntEnum):
"""

@classmethod
def get_enum(cls, __value: int) -> int | None:
def get_enum(cls, __value: int) -> Union[int, None]:
"""
Get Enum members equal to given values
Expand Down Expand Up @@ -75,7 +75,7 @@ def _wrapper(*args, **kwargs):
:param kw:
"""
return_type = inspect.signature(function).return_annotation
if return_type is not SomeEntitiesType:
if return_type is not SomeEntitiesType and return_type != 'SomeEntitiesType':
raise TypeError(
f"Action handlers must return launch event handlers 'launch.some_entities_type.SomeEntitiesType'. Method '{function.__name__}' cannot have '@action_handler' decorator"
)
Expand Down

0 comments on commit 24018b1

Please sign in to comment.