From 0394b3c7007479ec26d7baa7a49e537d449e9651 Mon Sep 17 00:00:00 2001 From: Peter Wegmann Date: Tue, 3 Sep 2024 16:12:53 +0200 Subject: [PATCH] preliminary nxstruct --- src/secop_ophyd/SECoPDevices.py | 113 +++++++++++++++++++++++++++++++- src/secop_ophyd/SECoPSignal.py | 14 ++++ tests/test_Nexus.py | 9 +++ 3 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 tests/test_Nexus.py diff --git a/src/secop_ophyd/SECoPDevices.py b/src/secop_ophyd/SECoPDevices.py index 2863d73..63c66c1 100644 --- a/src/secop_ophyd/SECoPDevices.py +++ b/src/secop_ophyd/SECoPDevices.py @@ -31,7 +31,7 @@ TupleOf, ) from ophyd_async.core.async_status import AsyncStatus -from ophyd_async.core.signal import SignalR, SignalRW, SignalX, observe_value +from ophyd_async.core.signal import Signal, SignalR, SignalRW, SignalX, observe_value from ophyd_async.core.standard_readable import ( ConfigSignal, HintedSignal, @@ -315,12 +315,15 @@ def __init__(self, secclient: AsyncFrappyClient, module_name: str): """ self.value: SignalR + self.descriptiom: SignalR super().__init__(secclient=secclient) self._module = module_name module_desc = secclient.modules[module_name] self.plans: list[Method] = [] + self.mod_prop_devices: Dict[str, SignalR] = {} + self.param_devices: Dict[str, T] = {} # generate Signals from Module Properties for property in module_desc["properties"]: @@ -330,6 +333,7 @@ def __init__(self, secclient: AsyncFrappyClient, module_name: str): setattr(self, property, SignalR(backend=propb)) self._config.append(getattr(self, property)) + self.mod_prop_devices[property] = getattr(self, property) # generate Signals from Module parameters eiter r or rw for parameter, properties in module_desc["parameters"].items(): @@ -345,6 +349,7 @@ def __init__(self, secclient: AsyncFrappyClient, module_name: str): sig_name=parameter, readonly=readonly, ) + self.param_devices[parameter] = getattr(self, parameter) # Initialize Command Devices for command, properties in module_desc["commands"].items(): @@ -479,6 +484,67 @@ def _signal_from_parameter(self, path: Path, sig_name: str, readonly: bool): else: self._config.append(getattr(self, sig_name)) + async def generate_nexus_parameter_log(self, param_name: str) -> str: + + sig: Signal = getattr(self, param_name) + + sig_backend: SECoPParamBackend = sig._backend + + # check if vlaue is numeric (NXlog only supports mumerical values) + if not sig_backend.is_number(): + return "" + + param_unit = f'"{sig_backend.get_unit()}"' + + unit_line = ( + f"\n\t\t@units = {param_unit}" if sig_backend.get_unit() is not None else "" + ) + + log_name = "value_log" if param_name == "value" else param_name + + text = f""" +{log_name}:NXlog +\t@NX_class = NXlog +\tvalue:NX = {self.value.name}{unit_line} +\t\t@type = "{sig_backend.describe_dict['SECoP_dtype']}" +\ttime:NX_NUMBER = {self.value.name}-timestamp +\t\t@start = /:NXentry/start_time +\t\t@units = "s" + +""" + + return text + + async def generate_nexus_struct(self) -> str: + + implementation: str = str(await self.implementation.get_value()) + description: str = str(await self.description.get_value()) + + text = f""" +{self._module}:NXsensor +\t@NX_class = NXsensor +\tname:NX_CHAR = "{self._module}" +\tmeasurement:NX_CHAR = "TODO" +\t\t@secop_importance:NX_INT= 0 +\tmodel:NX_CHAR = "{implementation}" +\tdescription:NX_CHAR = "{description}" + +""" + value_log: str = await self.generate_nexus_parameter_log("value") + + text += "\t".join(value_log.splitlines(True)) + + text += "\tparameters:NXcollection" + for parameter in self.param_devices.keys(): + if parameter == "value": + continue + + param_log: str = await self.generate_nexus_parameter_log(parameter) + + text += "\t".join(param_log.splitlines(True)) + + return text + class SECoPWritableDevice(SECoPReadableDevice): """Fast settable device target""" @@ -587,11 +653,17 @@ def __init__(self, secclient: AsyncFrappyClient): :param secclient: SECoP client providing communication to the SEC Node :type secclient: AsyncFrappyClient """ + + self.equipment_id: SignalR + self.description: SignalR + self.version: SignalR + self._secclient: AsyncFrappyClient = secclient self._module_name: str = "" self._node_cls_name: str = "" - self.mod_devices: Dict[str, T] = {} + self.mod_devices: Dict[str, SECoPReadableDevice] = {} + self.node_prop_devices: Dict[str, SignalR] = {} self.genCode: GenNodeCode @@ -604,6 +676,7 @@ def __init__(self, secclient: AsyncFrappyClient): propb = PropertyBackend(property, self._secclient.properties, secclient) setattr(self, property, SignalR(backend=propb)) config.append(getattr(self, property)) + self.node_prop_devices[property] = getattr(self, property) for module, module_desc in self._secclient.modules.items(): secop_dev_class = class_from_interface(module_desc["properties"]) @@ -705,7 +778,7 @@ async def disconnect_async(self): def class_from_instance(self, path_to_module: str | None = None): """Dynamically generate python class file for the SECoP_Node_Device, this allows autocompletion in IDEs and eases working with the generated Ophyd - devices + devices24246316.542 """ # parse genClass file if already present @@ -828,6 +901,34 @@ def nodeStateChange(self, online, state): # noqa: N802 if state == "connected" and online is True: self._secclient.conn_timestamp = ttime.time() + async def generate_nexus_struct(self) -> str: + + equipment_id: str = str(await self.equipment_id.get_value()) + + if hasattr(self, "firmware"): + firmware: str = str(await self.firmware.get_value()) + else: + firmware = "" + + version: str = str(await self.version.get_value()) + description: str = str(await self.description.get_value()) + + text = f""" +{equipment_id}:NXenvironment +\t@NX_class = NXenvironment +\tname:NX_CHAR = "{equipment_id}" +\tshort_name:NX_CHAR = {equipment_id.split('.')[0]} +\ttype:NX_CHAR = "{firmware} ({version})" +\tdescription:NX_CHAR = "{description}" + +""" + for module in self.mod_devices.values(): + mod_str: str = await module.generate_nexus_struct() + + text += "\t".join(mod_str.splitlines(True)) + + return text + IF_CLASSES = { "Drivable": SECoPMoveableDevice, @@ -837,6 +938,12 @@ def nodeStateChange(self, online, state): # noqa: N802 "Communicator": SECoPReadableDevice, } +SECOP_TO_NEXUS_TYPE = { + "double": "NX_FLOAT64", + "int": "NX_INT64", + "scaled": "NX_FLOAT64", +} + ALL_IF_CLASSES = set(IF_CLASSES.values()) diff --git a/src/secop_ophyd/SECoPSignal.py b/src/secop_ophyd/SECoPSignal.py index 37b06bb..f25d02f 100644 --- a/src/secop_ophyd/SECoPSignal.py +++ b/src/secop_ophyd/SECoPSignal.py @@ -271,6 +271,8 @@ def __init__(self, path: Path, secclient: AsyncFrappyClient) -> None: for property_name, prop_val in self.datainfo.items(): if property_name == "type": property_name = "SECoP_dtype" + if property_name == "unit": + property_name = "units" self.describe_dict[property_name] = prop_val def source(self, name: str) -> str: @@ -339,6 +341,18 @@ def get_param_path(self): def get_path_tuple(self): return self.path.get_path_tuple() + def get_unit(self): + return self.describe_dict.get("units", None) + + def is_number(self) -> bool: + if ( + self.describe_dict["dtype"] == "number" + or self.describe_dict["dtype"] == "integer" + ): + return True + + return False + class PropertyBackend(SignalBackend): """Readonly backend for static SECoP Properties of Nodes/Modules""" diff --git a/tests/test_Nexus.py b/tests/test_Nexus.py new file mode 100644 index 0000000..7c0ab73 --- /dev/null +++ b/tests/test_Nexus.py @@ -0,0 +1,9 @@ +from secop_ophyd.SECoPDevices import SECoPNodeDevice + + +async def test_node_nexus_struct(cryo_sim, cryo_node_internal_loop: SECoPNodeDevice): + generated_text = await cryo_node_internal_loop.generate_nexus_struct() + + print(generated_text) + + await cryo_node_internal_loop.disconnect_async()