Skip to content

Commit

Permalink
feat: 脚本处理
Browse files Browse the repository at this point in the history
  • Loading branch information
pinkhello committed Sep 9, 2023
1 parent 0999f3f commit 1400a17
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 216 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
uses: docker/build-push-action@v4
with:
context: .
file: ./devops/nano.mock.dockerfile
file: ./deploy/nano.mock.dockerfile
push: true
tags: ${{ env.REGISTRY }}/bayer-di-pub/nano_mock:latest
platforms: linux/amd64,linux/arm64
7 changes: 0 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,6 @@ ros2 launch nano nano.launch.py start:=true

> [http://localhost:18083/](http://localhost:18083/) (admin/public)

- `nano` restful api地址

> - redoc <http://localhost:8001/redoc>
> - openapi <http://localhost:8001/docs>

- `nano`(占领 `8001` 端口)【`可以更改`

- `nano` [`Nano` `mqtt` 消息说明](./nano/nano/README.md) (`Backend`需要关心)

```shell
Expand Down
2 changes: 0 additions & 2 deletions nano/nano/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,3 @@

# def __init__(self, detail: str):
# self.detail = detail


36 changes: 18 additions & 18 deletions nano/nano/core/command/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import json
import time

from ..schemas.message import AgvUpMsg, MessageType, CmdType, MqttMsgReq
from ..common.config import Settings
from ..schemas.message import AgvUpMsg, MessageType, CmdType, MqttMsgReq

"""
ros to mqtt topic 映射, 定制好后不会再变化 {ros_topic: [mqtt_attr, mqtt_topic, 需要多次发送的次数]}
Expand Down Expand Up @@ -63,34 +63,34 @@
'task_report_receipt': [CmdType.task_report_receipt.value, '/task_report_receipt', False]
}


class Converter():
def __init__(self,
settings: Settings):
self.settings = settings


def _topic_params(self):
env_prefix = 'test' if self.settings.environment != 'production' else 'prod'
return self.settings.name, self.settings.device_no, env_prefix


def convert_to_ros_pack(self, mqtt_topic: str, data: any) -> any: # type: ignore
if mqtt_topic:
cmd_name = mqtt_topic.split('/')[-1]
d = mqtt_2_ros_topic_map[cmd_name]
# cmd_type, ros_topic, need_ack, data, cmd_name
return d[0], d[1], d[2], data, cmd_name

if cmd_name in mqtt_2_ros_topic_map.keys():
d = mqtt_2_ros_topic_map[cmd_name]
# cmd_type, ros_topic, need_ack, data, cmd_name
return d[0], d[1], d[2], data, cmd_name

def convert_to_mqtt_pack(self, ros_topic: str, trace_id: str, data: any) -> any: # type: ignore
d = ros_2_mqtt_topic_map[ros_topic]
"""/nano/{env}/up/{device_no}/{topic_name}"""
if d:
msg_type = d[0]
topic = d[1]
name, device_no, env_prefix = self._topic_params()
mqtt_topic = f"/{name}/{env_prefix}/up/{device_no}/{topic}"
# TODO: 根据消息构建数据
agv_up_msg = AgvUpMsg(trace_id=trace_id, device_no=device_no, msg_type=msg_type, data=data,
ts=int(time.time() * 1000))
return MqttMsgReq(topic=mqtt_topic, msg=json.dumps(agv_up_msg.dict())), d[2]
if ros_topic and ros_topic in ros_2_mqtt_topic_map.keys():
d = ros_2_mqtt_topic_map[ros_topic]
"""/nano/{env}/up/{device_no}/{topic_name}"""
if d:
msg_type = d[0]
topic = d[1]
name, device_no, env_prefix = self._topic_params()
mqtt_topic = f"/{name}/{env_prefix}/up/{device_no}/{topic}"
# TODO: 根据消息构建数据
agv_up_msg = AgvUpMsg(trace_id=trace_id, device_no=device_no, msg_type=msg_type, data=data,
ts=int(time.time() * 1000))
return MqttMsgReq(topic=mqtt_topic, msg=json.dumps(agv_up_msg.dict())), d[2]
34 changes: 17 additions & 17 deletions nano/nano/core/command/downstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@
@Software: 代理ROS2数据、指令到MQTT应用
"""
import json

from std_msgs.msg import String
from paho.mqtt.client import Client

from ..common.config import Settings
from ..common.logger import Logger
from ..schemas.message import CmdType
from std_msgs.msg import String

from .convert import Converter
from .file_processor import FileProcess
from .total_task_report_queue import ReportOrderedDict
from ..common.config import Settings
from ..common.logger import Logger
from ..schemas.message import CmdType


class DownStream():
Expand All @@ -30,18 +28,20 @@ def __init__(self, settings: Settings, logger: Logger, cache_publishers: dict, r
async def cmd_process(self, client: Client, client_id: str, topic: str, msg: str):
# 云端 MQTT 下发指令处理 || 本地 MQTT 下发指令处理
cmd_type, ros_topic, need_ack, data, _ = self.converter.convert_to_ros_pack(mqtt_topic=topic, data=msg)
json_data = json.loads(data)
if need_ack:
self._cmd_ack(client=client, json_data=json_data)
self.logger.sys_log.info(f"MQTT => ROS : {topic} => {ros_topic}, 消息内容: {data}")
# nano_node_cache['node']
self._mqtt_to_ros2(ros_topic=ros_topic, msg=json_data, cmd_type=cmd_type)

if cmd_type:
json_data = json.loads(data)
if need_ack:
self._cmd_ack(client=client, json_data=json_data)
self.logger.sys_log.info(f"MQTT => ROS : {topic} => {ros_topic}, 消息内容: {data}")
# nano_node_cache['node']
self._mqtt_to_ros2(ros_topic=ros_topic, msg=json_data, cmd_type=cmd_type)

def _cmd_ack(self, client: Client, json_data: dict):
if 'trace_id' in json_data:
cmd_ack_msg, _ = self.converter.convert_to_mqtt_pack(ros_topic='/cmd_ack', trace_id=json_data['trace_id'], data=json.dumps(json_data))
client.publish(topic=cmd_ack_msg.topic, payload=cmd_ack_msg.msg, qos=cmd_ack_msg.qos) # type: ignore
cmd_ack_msg, _ = self.converter.convert_to_mqtt_pack(ros_topic='/cmd_ack', trace_id=json_data['trace_id'],
data=json.dumps(json_data))
if cmd_ack_msg:
client.publish(topic=cmd_ack_msg.topic, payload=cmd_ack_msg.msg, qos=cmd_ack_msg.qos) # type: ignore

def _mqtt_to_ros2(self, ros_topic: str, msg: dict, cmd_type: str):
"""TODO: 需要发布信息数据"""
Expand All @@ -58,12 +58,12 @@ def _mqtt_to_ros2(self, ros_topic: str, msg: dict, cmd_type: str):
version_msg = String()
version_msg.data = str(version)
self.cache_publishers['/road_network'].publish(version_msg)

if cmd_type in [CmdType.task_report_receipt.value]:
# 预备从队列中 移除任务报告的发送, 因为任务报告发送完毕
self.report_dict.remove(key=msg['trace_id'])

if cmd_type in [CmdType.deploy_task.value, CmdType.ctrl.value, CmdType.pong.value]:
ros_msg = String()
ros_msg.data = json.dumps(msg['data'])
self.cache_publishers[ros_topic].publish(ros_msg)
self.cache_publishers[ros_topic].publish(ros_msg)
8 changes: 3 additions & 5 deletions nano/nano/core/command/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import os
import requests

from typing import Optional

from ..common.config import Settings
Expand All @@ -19,7 +18,7 @@ class FileProcess():
def __init__(self, settings: Settings, logger: Logger):
self.settings = settings
self.logger = logger

def download_to_save(self, url: str, version: Optional[str] = None) -> bool:
"""下载"""
dir_path = self.settings.road_path
Expand Down Expand Up @@ -60,8 +59,6 @@ def download_to_save(self, url: str, version: Optional[str] = None) -> bool:
except Exception as e:
self.logger.sys_log.error(f"下载路网错误 {url}", e)
return False



def read_to_upload(self, url: str, version: Optional[str] = None):
dir_path = self.settings.maps_path
Expand All @@ -80,7 +77,8 @@ def read_to_upload(self, url: str, version: Optional[str] = None):

files = []
for file_name in file_names:
files.append(('files', (file_name, open(os.path.join(dir_path, file_name), 'rb'), 'application/octet-stream')))
files.append(
('files', (file_name, open(os.path.join(dir_path, file_name), 'rb'), 'application/octet-stream')))

if len(files) > 0:
# 查看是不是 yaml,读取版本进行处理
Expand Down
73 changes: 7 additions & 66 deletions nano/nano/core/command/total_task_report_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
@Software: 代理ROS2数据、指令到MQTT应用
"""
from collections import OrderedDict
from threading import RLock, Thread, Timer
from threading import RLock

from ..common.logger import Logger

Expand All @@ -17,97 +17,38 @@ def __init__(self, report: any, max_count: int):
self.counter = 0



class ReportOrderedDict():

def __init__(self, logger: Logger):
self.logger = logger
self.cache = OrderedDict()
self.lock = RLock()

def add(self, key:str, max_count: int, value: any):
def add(self, key: str, max_count: int, value: any):
with self.lock:
if key not in self.cache:
self.cache[key] = ReportItem(value, max_count)


def remove(self, key:str):
def remove(self, key: str):
self._remove(key=key, trigger='ack')

def _remove(self, key:str, trigger: str):
def _remove(self, key: str, trigger: str):
with self.lock:
if key in self.cache:
self.logger.sys_log.info(f'{trigger} remove-> {key}')
del self.cache[key]


def get(self, key:str):
def get(self, key: str):
with self.lock:
if key in self.cache:
return self.cache[key]

def process(self, callback):
with self.lock:
items = list(self.cache.items())
for key, obj in items:
self.logger.sys_log.info(str(obj.counter) + " " + key)
if obj.counter >= obj.max_count:
self._remove(key=key, trigger='max')
else:
obj.counter += 1
callback(obj.report)



# def _add(report_dict: ReportOrderedDict):

# while True:

# import time
# time.sleep(3)

# import uuid
# k = f"{uuid.uuid4()}"
# report_dict.add(k, k)

# def _remove(report_dict: ReportOrderedDict):
# import time
# time.sleep(10)

# print("====> 手动移除 x2")
# report_dict.remove("x2")


# def _callback(arg):
# print(f"====> 打印 callback ={arg}")

# def _print(report_dict: ReportOrderedDict):
# while True:
# import time
# time.sleep(3)
# report_dict.process(max=10, callback=_callback)


# if __name__ == '__main__':
# report_dict = ReportOrderedDict()
# report_dict.add("x1", "x1")
# report_dict.add("x2", "x2")
# report_dict.add("x3", "x3")

# thread_add = Thread(target=_add, args={report_dict,})
# thread_remove = Thread(target=_remove, args={report_dict,})

# thread_add.start()
# thread_remove.start()





# thread_print = Thread(target=_print, args={report_dict,})
# thread_print.start()

# thread_print.join()
# thread_add.join()
# thread_remove.join()

21 changes: 7 additions & 14 deletions nano/nano/core/command/upstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,21 @@
@Software: 代理ROS2数据、指令到MQTT应用
"""
import json
from uuid import uuid4
from rosidl_runtime_py import message_to_ordereddict
from uuid import uuid4

from .convert import Converter
from .total_task_report_queue import ReportOrderedDict
from ..common.config import Settings
from ..common.logger import Logger
from ..schemas.message import MqttClientType, MqttMsgReq
from .convert import Converter
from .total_task_report_queue import ReportOrderedDict



class UpStream():
def __init__(self, settings: Settings, logger: Logger, mqtt_clients: dict, report_dict: ReportOrderedDict):
self.settings = settings
self.logger = logger
self.converter = Converter(settings=self.settings)
self.converter = Converter(settings=self.settings)
self.mqtt_clients = mqtt_clients
self.report_dict = report_dict

Expand All @@ -41,8 +40,6 @@ def total_task_report_callback(self, msg):

def sub_task_report_callback(self, msg):
self._ros2_to_mqtt(ros_topic='/task/sub_task_report', msg=msg)



def _ros2_to_mqtt(self, ros_topic: str, msg: any): # type: ignore
"""ros msg 转成 Mqtt msg 转发到 mqtt server"""
Expand All @@ -51,26 +48,23 @@ def _ros2_to_mqtt(self, ros_topic: str, msg: any): # type: ignore
data = json.dumps(msg_dict)
mqtt_msg, count = self.converter.convert_to_mqtt_pack(ros_topic=ros_topic, trace_id=trace_id, data=data)
if mqtt_msg:
self.logger.sys_log.info(f"ROS => MQTT : {ros_topic} => {mqtt_msg.topic}, 消息内容: {data}")
self.logger.sys_log.info(f"ROS => MQTT : {ros_topic} => {mqtt_msg.topic}, 消息内容: {mqtt_msg.msg}")
import asyncio
asyncio.run(self.async_all_publish(mqtt_msg))
if count > 1:
self.report_dict.add(key=trace_id, max_count=count, value=mqtt_msg)

def retry_send_to(self, mqtt_msg: MqttMsgReq):
self.logger.sys_log.info(f"ROS => MQTT [RETRY] : => {mqtt_msg.topic}, 消息内容: {mqtt_msg.msg}")
self.logger.sys_log.info(f"RETRY => MQTT : {mqtt_msg.topic}, 消息内容: {mqtt_msg.msg}")
import asyncio
asyncio.run(self.async_all_publish(mqtt_msg))



def _local_publish(self, msg: MqttMsgReq):
"""发送本地 MQTT"""
if msg:
msg.client = MqttClientType.LOCAL.value
self.client_publish(msg=msg)


def _cloud_publish(self, msg: MqttMsgReq):
"""发送云端 MQTT"""
if msg:
Expand All @@ -93,8 +87,7 @@ def client_publish(self, msg: MqttMsgReq):
else:
self.logger.sys_log.debug(f"MQTT客户端类型 {client_type}, 客户端为空, 发送消息 {msg.dict()}")


async def async_all_publish(self, msg: MqttMsgReq):
"""异步开启发送"""
self._local_publish(msg=msg)
self._cloud_publish(msg=msg)
self._cloud_publish(msg=msg)
Loading

0 comments on commit 1400a17

Please sign in to comment.