Skip to content

Commit

Permalink
preliminary nxstruct
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Wegmann committed Sep 3, 2024
1 parent e90a791 commit 0394b3c
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 3 deletions.
113 changes: 110 additions & 3 deletions src/secop_ophyd/SECoPDevices.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
TupleOf,
)
from ophyd_async.core.async_status import AsyncStatus
from ophyd_async.core.signal import SignalR, SignalRW, SignalX, observe_value
from ophyd_async.core.signal import Signal, SignalR, SignalRW, SignalX, observe_value
from ophyd_async.core.standard_readable import (
ConfigSignal,
HintedSignal,
Expand Down Expand Up @@ -315,12 +315,15 @@ def __init__(self, secclient: AsyncFrappyClient, module_name: str):
"""

self.value: SignalR
self.descriptiom: SignalR

super().__init__(secclient=secclient)

self._module = module_name
module_desc = secclient.modules[module_name]
self.plans: list[Method] = []
self.mod_prop_devices: Dict[str, SignalR] = {}
self.param_devices: Dict[str, T] = {}

# generate Signals from Module Properties
for property in module_desc["properties"]:
Expand All @@ -330,6 +333,7 @@ def __init__(self, secclient: AsyncFrappyClient, module_name: str):

setattr(self, property, SignalR(backend=propb))
self._config.append(getattr(self, property))
self.mod_prop_devices[property] = getattr(self, property)

# generate Signals from Module parameters eiter r or rw
for parameter, properties in module_desc["parameters"].items():
Expand All @@ -345,6 +349,7 @@ def __init__(self, secclient: AsyncFrappyClient, module_name: str):
sig_name=parameter,
readonly=readonly,
)
self.param_devices[parameter] = getattr(self, parameter)

# Initialize Command Devices
for command, properties in module_desc["commands"].items():
Expand Down Expand Up @@ -479,6 +484,67 @@ def _signal_from_parameter(self, path: Path, sig_name: str, readonly: bool):
else:
self._config.append(getattr(self, sig_name))

async def generate_nexus_parameter_log(self, param_name: str) -> str:

sig: Signal = getattr(self, param_name)

sig_backend: SECoPParamBackend = sig._backend

# check if vlaue is numeric (NXlog only supports mumerical values)
if not sig_backend.is_number():
return ""

param_unit = f'"{sig_backend.get_unit()}"'

unit_line = (
f"\n\t\t@units = {param_unit}" if sig_backend.get_unit() is not None else ""
)

log_name = "value_log" if param_name == "value" else param_name

text = f"""
{log_name}:NXlog
\t@NX_class = NXlog
\tvalue:NX = {self.value.name}{unit_line}
\t\t@type = "{sig_backend.describe_dict['SECoP_dtype']}"
\ttime:NX_NUMBER = {self.value.name}-timestamp
\t\t@start = /:NXentry/start_time
\t\t@units = "s"
"""

return text

async def generate_nexus_struct(self) -> str:

implementation: str = str(await self.implementation.get_value())
description: str = str(await self.description.get_value())

text = f"""
{self._module}:NXsensor
\t@NX_class = NXsensor
\tname:NX_CHAR = "{self._module}"
\tmeasurement:NX_CHAR = "TODO"
\t\t@secop_importance:NX_INT= 0
\tmodel:NX_CHAR = "{implementation}"
\tdescription:NX_CHAR = "{description}"
"""
value_log: str = await self.generate_nexus_parameter_log("value")

text += "\t".join(value_log.splitlines(True))

text += "\tparameters:NXcollection"
for parameter in self.param_devices.keys():
if parameter == "value":
continue

param_log: str = await self.generate_nexus_parameter_log(parameter)

text += "\t".join(param_log.splitlines(True))

return text


class SECoPWritableDevice(SECoPReadableDevice):
"""Fast settable device target"""
Expand Down Expand Up @@ -587,11 +653,17 @@ def __init__(self, secclient: AsyncFrappyClient):
:param secclient: SECoP client providing communication to the SEC Node
:type secclient: AsyncFrappyClient
"""

self.equipment_id: SignalR
self.description: SignalR
self.version: SignalR

self._secclient: AsyncFrappyClient = secclient

self._module_name: str = ""
self._node_cls_name: str = ""
self.mod_devices: Dict[str, T] = {}
self.mod_devices: Dict[str, SECoPReadableDevice] = {}
self.node_prop_devices: Dict[str, SignalR] = {}

self.genCode: GenNodeCode

Expand All @@ -604,6 +676,7 @@ def __init__(self, secclient: AsyncFrappyClient):
propb = PropertyBackend(property, self._secclient.properties, secclient)
setattr(self, property, SignalR(backend=propb))
config.append(getattr(self, property))
self.node_prop_devices[property] = getattr(self, property)

for module, module_desc in self._secclient.modules.items():
secop_dev_class = class_from_interface(module_desc["properties"])
Expand Down Expand Up @@ -705,7 +778,7 @@ async def disconnect_async(self):
def class_from_instance(self, path_to_module: str | None = None):
"""Dynamically generate python class file for the SECoP_Node_Device, this
allows autocompletion in IDEs and eases working with the generated Ophyd
devices
devices24246316.542
"""

# parse genClass file if already present
Expand Down Expand Up @@ -828,6 +901,34 @@ def nodeStateChange(self, online, state): # noqa: N802
if state == "connected" and online is True:
self._secclient.conn_timestamp = ttime.time()

async def generate_nexus_struct(self) -> str:

equipment_id: str = str(await self.equipment_id.get_value())

if hasattr(self, "firmware"):
firmware: str = str(await self.firmware.get_value())
else:
firmware = ""

version: str = str(await self.version.get_value())
description: str = str(await self.description.get_value())

text = f"""
{equipment_id}:NXenvironment
\t@NX_class = NXenvironment
\tname:NX_CHAR = "{equipment_id}"
\tshort_name:NX_CHAR = {equipment_id.split('.')[0]}
\ttype:NX_CHAR = "{firmware} ({version})"
\tdescription:NX_CHAR = "{description}"
"""
for module in self.mod_devices.values():
mod_str: str = await module.generate_nexus_struct()

text += "\t".join(mod_str.splitlines(True))

return text


IF_CLASSES = {
"Drivable": SECoPMoveableDevice,
Expand All @@ -837,6 +938,12 @@ def nodeStateChange(self, online, state): # noqa: N802
"Communicator": SECoPReadableDevice,
}

SECOP_TO_NEXUS_TYPE = {
"double": "NX_FLOAT64",
"int": "NX_INT64",
"scaled": "NX_FLOAT64",
}


ALL_IF_CLASSES = set(IF_CLASSES.values())

Expand Down
14 changes: 14 additions & 0 deletions src/secop_ophyd/SECoPSignal.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ def __init__(self, path: Path, secclient: AsyncFrappyClient) -> None:
for property_name, prop_val in self.datainfo.items():
if property_name == "type":
property_name = "SECoP_dtype"
if property_name == "unit":
property_name = "units"
self.describe_dict[property_name] = prop_val

def source(self, name: str) -> str:
Expand Down Expand Up @@ -339,6 +341,18 @@ def get_param_path(self):
def get_path_tuple(self):
return self.path.get_path_tuple()

def get_unit(self):
return self.describe_dict.get("units", None)

def is_number(self) -> bool:
if (
self.describe_dict["dtype"] == "number"
or self.describe_dict["dtype"] == "integer"
):
return True

return False


class PropertyBackend(SignalBackend):
"""Readonly backend for static SECoP Properties of Nodes/Modules"""
Expand Down
9 changes: 9 additions & 0 deletions tests/test_Nexus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from secop_ophyd.SECoPDevices import SECoPNodeDevice


async def test_node_nexus_struct(cryo_sim, cryo_node_internal_loop: SECoPNodeDevice):
generated_text = await cryo_node_internal_loop.generate_nexus_struct()

print(generated_text)

await cryo_node_internal_loop.disconnect_async()

0 comments on commit 0394b3c

Please sign in to comment.