-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBridgeManager.py
78 lines (61 loc) · 2.67 KB
/
BridgeManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from abc import ABC, abstractmethod
from typing import Callable
import carb.events
class Manager_Events:
"""
Constants for the Manager class.
"""
def __init__(self, bridge_name: str):
self.EVENT_TYPE_DATA_INIT = f"loupe.simulation.{bridge_name}.DATA_INIT"
self.EVENT_TYPE_DATA_READ = f"loupe.simulation.{bridge_name}.DATA_READ"
self.EVENT_TYPE_DATA_READ_REQ = f"loupe.simulation.{bridge_name}.DATA_READ_REQ"
self.EVENT_TYPE_DATA_WRITE_REQ = f"loupe.simulation.{bridge_name}.DATA_WRITE_REQ"
self.EVENT_TYPE_CONNECTION = f"loupe.simulation.{bridge_name}.CONNECTION"
self.EVENT_TYPE_STATUS = f"loupe.simulation.{bridge_name}.STATUS"
self.EVENT_TYPE_ENABLE = f"loupe.simulation.{bridge_name}.ENABLE"
class BridgeManager(ABC):
@abstractmethod
def register_init_callback(self, callback: Callable[[carb.events.IEvent], None]):
"""
Registers a callback function for the DATA_INIT event.
The callback is triggered when the Beckhoff Bridge is initialized.
The user should use this event to add cyclic read variables.
This event may get called multiple times in normal operation due to the nature of how extensions are loaded.
Args:
callback (function): The callback function to be registered.
Returns:
None
"""
@abstractmethod
def register_data_callback(self, callback: Callable[[carb.events.IEvent], None]):
"""
Registers a callback function for the DATA_READ event.
The callback is triggered when the Beckhoff Bridge receives new data. The payload contains the updated variables.
Args:
callback (Callable): The callback function to be registered.
example callback:
def on_message( event ):
data = event.payload['data']['MAIN']['custom_struct']['var_array']
Returns:
None
"""
@abstractmethod
def add_cyclic_read_variables(self, variable_name_array: list[str]):
"""
Adds variables to the cyclic read list.
Variables in the cyclic read list are read from the Beckhoff Bridge at a fixed interval.
Args:
variableList (list): List of variables to be added. ["MAIN.myStruct.myvar1", "MAIN.var2", ...]
Returns:
None
"""
@abstractmethod
def write_variable(self, name: str, value: any):
"""
Writes a variable value to the Beckhoff Bridge.
Args:
name (str): The name of the variable. "MAIN.myStruct.myvar1"
value (basic type): The value to be written. 1, 2.5, "Hello", ...
Returns:
None
"""