From 5017db41636daf6c26c63113bef1f657a410f83d Mon Sep 17 00:00:00 2001 From: Pavel Vasilyev Date: Fri, 24 Jan 2025 15:39:49 +0100 Subject: [PATCH] WIP: Create python scripts to parse CD from .dts file and generate CDP1 Scripts are in progress. Status: - CD to CDP1 kind of completed but correctness is not checked. - .dts to CD is in progress, can extract models and elements, need to parse relationship Signed-off-by: Pavel Vasilyev --- samples/bluetooth/mesh/ngcdp/cdp1_gen.py | 200 ++++++++++++++++++ samples/bluetooth/mesh/ngcdp/cdp_dts_parse.py | 88 ++++++++ 2 files changed, 288 insertions(+) create mode 100644 samples/bluetooth/mesh/ngcdp/cdp1_gen.py create mode 100644 samples/bluetooth/mesh/ngcdp/cdp_dts_parse.py diff --git a/samples/bluetooth/mesh/ngcdp/cdp1_gen.py b/samples/bluetooth/mesh/ngcdp/cdp1_gen.py new file mode 100644 index 000000000000..fd60187728c2 --- /dev/null +++ b/samples/bluetooth/mesh/ngcdp/cdp1_gen.py @@ -0,0 +1,200 @@ +import struct + +class ModRelation: + def __init__(self, elem_base, idx_base, elem_ext, idx_ext, relation_type): + self.elem_base = elem_base + self.idx_base = idx_base + self.elem_ext = elem_ext + self.idx_ext = idx_ext + self.type = relation_type # TODO: Rename to cor_id + +class BtMeshComp: + def __init__(self): + self.mod_rel_list = [] + + def add_relation(self, elem_base, idx_base, elem_ext, idx_ext, relation_type): + self.mod_rel_list.append(ModRelation(elem_base, idx_base, elem_ext, idx_ext, relation_type)) + + def bt_mesh_model_correspond(self, corresponding_mod, base_mod): + cor_id = 0 + for rel in self.mod_rel_list: + if (self.is_model_base(base_mod, rel) or self.is_model_extension(base_mod, rel) or + self.is_model_base(corresponding_mod, rel) or self.is_model_extension(corresponding_mod, rel)) and rel.type < 0xFF: + cor_id = rel.type + break + else: + cor_id = max((rel.type for rel in self.mod_rel_list if rel.type < 0xFF), default=0) + 1 + self.add_relation(base_mod['elem_idx'], base_mod['mod_idx'] + base_mod.get('vnd_offset', 0), + corresponding_mod['elem_idx'], corresponding_mod['mod_idx'] + corresponding_mod.get('vnd_offset', 0), + cor_id) + + def bt_mesh_model_extend(self, extending_mod, base_mod): + self.add_relation(base_mod['elem_idx'], base_mod['mod_idx'] + base_mod.get('vnd_offset', 0), + extending_mod['elem_idx'], extending_mod['mod_idx'] + extending_mod.get('vnd_offset', 0), + 0xFF) + + def is_model_base(self, model, rel): + return model['elem_idx'] == rel.elem_base and (model['mod_idx'] + model.get('vnd_offset', 0)) == rel.idx_base + + def is_model_extension(self, model, rel): + return model['elem_idx'] == rel.elem_ext and (model['mod_idx'] + model.get('vnd_offset', 0)) == rel.idx_ext + + def count_mod_ext(self, model): + offset_record = 0 + extensions = 0 + + for rel in self.mod_rel_list: + if self.is_model_extension(model, rel) and rel.type == 0xFF: + extensions += 1 + + offset = rel.elem_ext - rel.elem_base + if (abs(offset) > abs(offset_record)): + offset_record = offset + + return offset_record, extensions + + def is_cor_present(self, model): + for rel in self.mod_rel_list: + if (self.is_model_base(model, rel) or self.is_model_extension(model, rel)) and rel.type < 0xFF: + return True, rel.type + + return False, None + + def prep_model_item_header(self, model, buf): + offset_record, ext_mod_cnt = self.count_mod_ext(model) + cor_present, cor_id = self.is_cor_present(model) + + mod_elem_info = ext_mod_cnt << 2 + if ext_mod_cnt > 31 or offset_record < -4 or offset_record > 3: + mod_elem_info |= 0b10 + + if cor_present: + mod_elem_info |= 0b01 + + self.data_buf_add_u8_offset(buf, mod_elem_info) + + if cor_present: + self.data_buf_add_u8_offset(buf, cor_id) + + return ext_mod_cnt + + def data_buf_add_u8_offset(self, buf, val): + # TODO: offset is not supported yet + buf.append(val) + + def add_items_to_page(self, buf, model, ext_mod_cnt): + for rel in self.mod_rel_list: + if not (self.is_model_extension(model, rel) and rel.type == 0xFF): + continue + + elem_offset = model['elem_idx'] - rel.elem_base + mod_idx = rel.idx_base + + if ext_mod_cnt < 32 and elem_offset < 4 and elem_offset > -5: + # short format + if elem_offset < 0: + elem_offset += 8 + + elem_offset |= mod_idx << 3 + self.data_buf_add_u8_offset(buf, elem_offset) + else: + # long format + if elem_offset < 0: + elem_offset += 256 + + self.data_buf_add_u8_offset(buf, elem_offset) + self.data_buf_add_u8_offset(buf, mod_idx) + + def bt_mesh_comp_data_get_page_1(self, comp): + # TODO: offset is not supported yet + + binary_output = bytearray() + + for elem_idx, elem in enumerate(comp): + buf = bytearray() + + sig = elem.get('sig', []) + vnd = elem.get('vnd', []) + + buf = struct.pack( + " len(sig): + return len(sig) + + return 0 + +# Preprocess composition array +for elem_idx, elem in enumerate(comp): + sig, vnd = elem.get('sig', []), elem.get('vnd', []) + vnd_offset = len(sig) + + for model_idx, model in enumerate(sig): + model['elem_idx'] = elem_idx + model['mod_idx'] = model_idx + model['vnd_offset'] = vnd_offset + + if 'cor' in model: + for cor in model['cor']: + cor['vnd_offset'] = get_vnd_offset(comp, cor) + + bt_mesh.bt_mesh_model_correspond(cor, model) + + if 'ext' in model: + for ext in model['ext']: + ext['vnd_offset'] = get_vnd_offset(comp, ext) + bt_mesh.bt_mesh_model_extend(ext, model) + +# Generate page 1 binary data +page_data = bt_mesh.bt_mesh_comp_data_get_page_1(comp) +print(page_data) + diff --git a/samples/bluetooth/mesh/ngcdp/cdp_dts_parse.py b/samples/bluetooth/mesh/ngcdp/cdp_dts_parse.py new file mode 100644 index 000000000000..44a276d94f1c --- /dev/null +++ b/samples/bluetooth/mesh/ngcdp/cdp_dts_parse.py @@ -0,0 +1,88 @@ +import os +import tempfile +import devicetree.edtlib as edtlib + +# Path to your DTS file +dts_file = "ble_mesh.overlay" +#dts_file = "build/ngcdp/zephyr/zephyr.dts" + +# Read the original content +with open(dts_file, "r") as original_file: + original_content = original_file.read() + +# Prepend "/dts-v1/;" and create an temporary file +# "/dts-v1/;" is required for the edtlib.EDT to work +with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".dts") as temp_file: + temp_file.write("/dts-v1/;\n" + original_content) + temp_file_path = temp_file.name + +# Path to the include directories (if applicable) +# Usually, the Zephyr project or your specific project includes these. + +# All bindings-dirs in Zephyr can be found in build//build_info.yml +include_dirs = ['/home/pavel/nordic/ncs/git-repos/ncs/nrf/dts/bindings', + '/home/pavel/nordic/ncs/git-repos/ncs/nrf/samples/bluetooth/mesh/ngcdp/dts/bindings', + '/home/pavel/nordic/ncs/git-repos/ncs/zephyr/dts/bindings'] + +# Create an EDT instance +try: + #edt = edtlib.EDT(dts_file, include_dirs) + edt = edtlib.EDT(temp_file_path, include_dirs) +except edtlib.EDTError as e: + print(f"EDT Error: {e}") + exit(1) + +os.remove(temp_file_path) + +# Access and process the parsed data +def parse_mesh_nodes(edt): + comp = list() + + # Find the mesh node + mesh_node = None + + for node in edt.nodes: + if node.name == "mesh": + mesh_node = node + break + + if mesh_node is None: + print("Mesh node not found!") + return + + print(f"Parsing 'mesh' node: {mesh_node.path}") + + # Find elements within the mesh + elements = mesh_node.children['elements'] + for element_name, element_node in elements.children.items(): + print(f"\nElement: {element_name}") + print(f" Location: {element_node.props.get('location').val}") + print(f" Compatible: {element_node.props.get('compatible').val}") + + parsed_models = dict() + sig_models = list() + + # Access models within the element + models = element_node.children['models'] + for model_name, model_node in models.children.items(): + print(f" Model: {model_name}") + for prop_name, prop in model_node.props.items(): + print(f" {prop_name}: {prop.val}") + + # Add the model to the list of SIG models + sig_models.append({"ext":[],"cor":[]}) + + parsed_models["sig"] = sig_models + # Can also be skipped + parsed_models["vnd"] = list() + + # Add the element to the composition + comp.append(parsed_models) + + return comp + +# Call the parser function +comp = parse_mesh_nodes(edt) + +# Print the composition +print(comp)