Skip to content

Commit

Permalink
(fix) Fixes deserialization of external processors and handling of pr…
Browse files Browse the repository at this point in the history
…ocessor result in launcher
  • Loading branch information
aleph-ra committed Oct 24, 2024
1 parent 2d6360f commit e5b93f3
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
5 changes: 3 additions & 2 deletions ros_sugar/core/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,9 +775,9 @@ def _external_processors_json(self, processors_serialized: Union[str, bytes]):
:param processors_serialized: Serialized Processors Dict
:type processors_serialized: Union[str, bytes]
"""
processors_data = json.loads(processors_serialized)
self._external_processors = json.loads(processors_serialized)
# Create sockets out of function names and connect them
for key, processor_data in processors_data.items():
for key, processor_data in self._external_processors.items():
for idx, func_name in enumerate(processor_data[0]):
sock_file = f"/tmp/{self.node_name}_{key}_{func_name}.socket"
if not os.path.exists(sock_file):
Expand Down Expand Up @@ -1285,6 +1285,7 @@ def _attach_external_processors(self):
"""
Attach external processors
"""
self.get_logger().info('ATTACHING EXTERNAL PROCESSORS')
for topic_name, (
processors,
processor_type,
Expand Down
7 changes: 4 additions & 3 deletions ros_sugar/launch/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ def __listen_for_external_processing(self, sock: socket.socket, func: Callable):
import msgpack

# Block to accept connections
conn, addr = sock.accept()
logger.info(f"Processor connected from {addr}")
conn, _ = sock.accept()
logger.info(f"EXTERNAL PROCESSOR CONNECTED ON {conn}")
while True:
# TODO: Make the buffer size a parameter
# Block to receive data
Expand All @@ -532,8 +532,9 @@ def __listen_for_external_processing(self, sock: socket.socket, func: Callable):
# TODO: Retreive errors
data = msgpack.unpackb(data)
result = func(data)
logger.debug(f"Got result from external processor: {result}")
result = msgpack.packb(result)
conn.sendall(data)
conn.sendall(result)

def _setup_external_processors(self, component: BaseComponent) -> None:
if not component._external_processors:
Expand Down

0 comments on commit e5b93f3

Please sign in to comment.