From ed057b0cd94aee40ff318e987e65a6695cedd020 Mon Sep 17 00:00:00 2001 From: YektaY Date: Thu, 6 Mar 2025 10:48:55 -0800 Subject: [PATCH] added in a pv search widget along with drag and drop to the var_table and obj_table. Added a error message if the search button is clicked before the env is set. --- .../environments/sphere_2d/__init__.py | 10 + src/badger/environment.py | 3 + .../gui/acr/components/archive_search.py | 232 ++++++++++ src/badger/gui/acr/components/env_cbox.py | 12 +- src/badger/gui/acr/components/obj_table.py | 388 ++++++++++++++++ src/badger/gui/acr/components/routine_page.py | 15 + src/badger/gui/acr/components/var_table.py | 426 ++++++++++++++++++ 7 files changed, 1084 insertions(+), 2 deletions(-) create mode 100644 src/badger/gui/acr/components/archive_search.py create mode 100644 src/badger/gui/acr/components/obj_table.py create mode 100644 src/badger/gui/acr/components/var_table.py diff --git a/src/badger/built_in_plugins/environments/sphere_2d/__init__.py b/src/badger/built_in_plugins/environments/sphere_2d/__init__.py index 8bcb9fe0..af8801da 100644 --- a/src/badger/built_in_plugins/environments/sphere_2d/__init__.py +++ b/src/badger/built_in_plugins/environments/sphere_2d/__init__.py @@ -1,4 +1,6 @@ from badger import environment +from PyQt5.QtNetwork import QNetworkRequest +from PyQt5.QtCore import QUrl class Environment(environment.Environment): @@ -33,3 +35,11 @@ def set_variables(self, variable_inputs: dict[str, float]): def get_observables(self, observable_names): return {k: self._observations[k] for k in observable_names} + + def search(self, keyword, archive_url): + url_string = ( + f"{archive_url}" f"retrieval/bpl/searchForPVsRegex?regex=.*{keyword}.*" + ) + request = QNetworkRequest(QUrl(url_string)) + + return request diff --git a/src/badger/environment.py b/src/badger/environment.py index ee21cf5f..0031a8e8 100644 --- a/src/badger/environment.py +++ b/src/badger/environment.py @@ -122,6 +122,9 @@ def set_variable(self, variable_name, variable_value): def get_bound(self, variable_name): return self.get_bounds([variable_name])[variable_name] + def search(self, keyword=None, archive_url=None): + return None + ############################################################ # Expert level of customization ############################################################ diff --git a/src/badger/gui/acr/components/archive_search.py b/src/badger/gui/acr/components/archive_search.py new file mode 100644 index 00000000..5880f174 --- /dev/null +++ b/src/badger/gui/acr/components/archive_search.py @@ -0,0 +1,232 @@ +import logging +from typing import List +from PyQt5.QtGui import QDrag, QKeyEvent +from PyQt5.QtCore import ( + QAbstractTableModel, + QMimeData, + QModelIndex, + QObject, + Qt, + QVariant, + pyqtSignal, +) +from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply +from PyQt5.QtWidgets import ( + QAbstractItemView, + QHBoxLayout, + QHeaderView, + QLabel, + QLineEdit, + QPushButton, + QTableView, + QVBoxLayout, + QWidget, +) + +logger = logging.getLogger(__name__) + + +class ArchiveResultsTableModel(QAbstractTableModel): + """This table model holds the results of an archiver appliance PV search. This search is for names matching + the input search words, and the results are a list of PV names that match that search. + + Parameters + ---------- + parent : QObject, optional + The parent item of this table + """ + + def __init__(self, parent: QObject = None) -> None: + super().__init__(parent=parent) + + self.results_list = [] + self.column_names = ("PV",) + + def rowCount(self, parent: QObject) -> int: + """Return the row count of the table""" + if parent is not None and parent.isValid(): + return 0 + return len(self.results_list) + + def columnCount(self, parent: QObject) -> int: + """Return the column count of the table""" + if parent is not None and parent.isValid(): + return 0 + return len(self.column_names) + + def data(self, index: QModelIndex, role: int) -> QVariant: + """Return the data for the associated role. Currently only supporting DisplayRole.""" + if not index.isValid(): + return QVariant() + + if role != Qt.DisplayRole: + return QVariant() + + return self.results_list[index.row()] + + def headerData(self, section, orientation, role=Qt.DisplayRole) -> QVariant: + """Return data associated with the header""" + if role != Qt.DisplayRole: + return super().headerData(section, orientation, role) + + return str(self.column_names[section]) + + def flags(self, index: QModelIndex) -> Qt.ItemFlags: + """Return flags that determine how users can interact with the items in the table""" + if index.isValid(): + return Qt.ItemIsEnabled | Qt.ItemIsSelectable | Qt.ItemIsDragEnabled + + def append(self, pv: str) -> None: + """Appends a row to this table given the PV name as input""" + self.beginInsertRows( + QModelIndex(), len(self.results_list), len(self.results_list) + ) + self.results_list.append(pv) + self.endInsertRows() + self.layoutChanged.emit() + + def replace_rows(self, pvs: List[str]) -> None: + """Overwrites any existing rows in the table with the input list of PV names""" + self.beginInsertRows(QModelIndex(), 0, len(pvs) - 1) + self.results_list = pvs + self.endInsertRows() + self.layoutChanged.emit() + + def clear(self) -> None: + """Clear out all data stored in this table""" + self.beginRemoveRows(QModelIndex(), 0, len(self.results_list)) + self.results_list = [] + self.endRemoveRows() + self.layoutChanged.emit() + + def sort(self, col: int, order=Qt.AscendingOrder) -> None: + """Sort the table by PV name""" + self.results_list.sort(reverse=order == Qt.DescendingOrder) + self.layoutChanged.emit() + + +class ArchiveSearchWidget(QWidget): + """ + The ArchiveSearchWidget is a display widget for showing the results of a PV search using an instance of the + EPICS archiver appliance. Currently the only type of search supported is for PV names matching an input search + string, though this can be extended in the future. + + Parameters + ---------- + parent : QObject, optional + The parent item of this widget + """ + + append_PVs_requested = pyqtSignal(str) + + def __init__(self, environment, parent: QObject = None) -> None: + super().__init__(parent=parent) + self.env = environment + self.network_manager = QNetworkAccessManager() + self.network_manager.finished.connect(self.populate_results_list) + + self.resize(400, 800) + self.layout = QVBoxLayout() + + self.archive_title_label = QLabel("Archive URL:") + self.archive_url_textedit = QLineEdit("http://lcls-archapp.slac.stanford.edu/") + self.archive_url_textedit.setFixedWidth(250) + self.archive_url_textedit.setFixedHeight(25) + + self.search_label = QLabel("Pattern:") + self.search_box = QLineEdit() + self.search_button = QPushButton("Search") + self.search_button.setDefault(True) + self.search_button.clicked.connect(self.request_archiver_info) + + self.loading_label = QLabel("Loading...") + self.loading_label.hide() + + self.results_table_model = ArchiveResultsTableModel() + self.results_view = QTableView(self) + self.results_view.setModel(self.results_table_model) + self.results_view.setProperty("showDropIndicator", False) + self.results_view.setDragDropOverwriteMode(False) + self.results_view.setDragEnabled(True) + self.results_view.setSelectionMode(QAbstractItemView.ExtendedSelection) + self.results_view.setSelectionBehavior(QAbstractItemView.SelectRows) + self.results_view.setDropIndicatorShown(True) + self.results_view.setCornerButtonEnabled(False) + self.results_view.setSortingEnabled(True) + self.results_view.verticalHeader().setVisible(False) + self.results_view.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) + self.results_view.startDrag = self.startDragAction + + self.archive_url_layout = QHBoxLayout() + self.archive_url_layout.addWidget(self.archive_title_label) + self.archive_url_layout.addWidget(self.archive_url_textedit) + self.layout.addLayout(self.archive_url_layout) + self.search_layout = QHBoxLayout() + self.search_layout.addWidget(self.search_label) + self.search_layout.addWidget(self.search_box) + self.search_layout.addWidget(self.search_button) + self.layout.addLayout(self.search_layout) + self.layout.addWidget(self.loading_label) + self.layout.addWidget(self.results_view) + # self.insert_button = QPushButton("Add PVs") + # self.insert_button.clicked.connect( + # lambda: self.append_PVs_requested.emit(self.selectedPVs()) + # ) + self.results_view.doubleClicked.connect( + lambda: self.append_PVs_requested.emit(self.selectedPVs()) + ) + # self.layout.addWidget(self.insert_button) + self.setLayout(self.layout) + + def selectedPVs(self) -> str: + """Figure out based on which indexes were selected, the list of PVs (by string name) + The user was hoping to insert into the table. Concatenate them into string form i.e. + , , """ + indices = self.results_view.selectedIndexes() + pv_list = "" + for index in indices: + pv_name = self.results_table_model.results_list[index.row()] + pv_list += pv_name + ", " + return pv_list[:-2] + + def startDragAction(self, supported_actions) -> None: + """ + The method to be called when a user initiates a drag action for one of the results in the table. The current + reason for this functionality is the ability to drag a PV name onto a plot to automatically start drawing + data for that PV + """ + drag = QDrag(self) + mime_data = QMimeData() + mime_data.setText(self.selectedPVs()) + drag.setMimeData(mime_data) + drag.exec_() + + def keyPressEvent(self, e: QKeyEvent) -> None: + """Special key press tracker, just so that if enter or return is pressed the formula dialog attempts to submit the formula""" + if e.key() == Qt.Key_Return or e.key() == Qt.Key_Enter: + self.request_archiver_info() + return super().keyPressEvent(e) + + def request_archiver_info(self) -> None: + """ """ + search_text = self.search_box.text() + search_text = search_text.replace("?", ".") + request = self.env.search(search_text, self.archive_url_textedit.text()) + + if request is None: + request = "" + + self.network_manager.get(request) + self.loading_label.show() + + def populate_results_list(self, reply: QNetworkReply) -> None: + """Slot called when the archiver appliance returns search results. Will populate the table with the results""" + self.loading_label.hide() + if reply.error() == QNetworkReply.NoError: + self.results_table_model.clear() + bytes_str = reply.readAll() + pv_list = str(bytes_str, "utf-8").split() + self.results_table_model.replace_rows(pv_list) + else: + logger.error(f"Could not retrieve archiver results due to: {reply.error()}") + reply.deleteLater() diff --git a/src/badger/gui/acr/components/env_cbox.py b/src/badger/gui/acr/components/env_cbox.py index 6e88544e..3bbf3ebd 100644 --- a/src/badger/gui/acr/components/env_cbox.py +++ b/src/badger/gui/acr/components/env_cbox.py @@ -14,9 +14,10 @@ ) from PyQt5.QtCore import QRegExp, QPropertyAnimation +from badger.gui.acr.components.archive_search import ArchiveSearchWidget from badger.gui.default.components.collapsible_box import CollapsibleBox -from badger.gui.default.components.var_table import VariableTable -from badger.gui.default.components.obj_table import ObjectiveTable +from badger.gui.acr.components.var_table import VariableTable +from badger.gui.acr.components.obj_table import ObjectiveTable from badger.gui.default.components.data_table import init_data_table from badger.settings import init_settings from badger.gui.default.utils import ( @@ -114,6 +115,8 @@ def init_ui(self): btn_env_play.setFixedSize(128, 24) if not strtobool(config_singleton.read_value("BADGER_ENABLE_ADVANCED")): btn_env_play.hide() + self.btn_pv = btn_pv = QPushButton("PV Search") + btn_pv.setFixedSize(128, 24) self.btn_docs = btn_docs = QPushButton("Open Docs") btn_docs.setFixedSize(96, 24) self.btn_params = btn_params = QPushButton("Parameters") @@ -125,6 +128,7 @@ def init_ui(self): hbox_name.addWidget(cb, 1) hbox_name.addWidget(btn_env_play) hbox_name.addWidget(btn_params) + hbox_name.addWidget(btn_pv) hbox_name.addWidget(btn_docs) vbox.addWidget(name) @@ -508,3 +512,7 @@ def update_stylesheets(self, environment=""): else: stylesheet = "" self.setStyleSheet(stylesheet) + + def archiveSearchMenu(self): + self.archive_search = ArchiveSearchWidget() + self.archive_search.show() diff --git a/src/badger/gui/acr/components/obj_table.py b/src/badger/gui/acr/components/obj_table.py new file mode 100644 index 00000000..3cfa5ae6 --- /dev/null +++ b/src/badger/gui/acr/components/obj_table.py @@ -0,0 +1,388 @@ +from typing import Any, List, Dict, Optional +from PyQt5.QtWidgets import ( + QTableWidget, + QTableWidgetItem, + QHeaderView, + QAbstractItemView, + QCheckBox, + QComboBox, + QStyledItemDelegate, +) +from PyQt5.QtGui import QDropEvent, QDragEnterEvent, QDragMoveEvent + + +class ObjectiveTable(QTableWidget): + """ + A custom QTableWidget for displaying and managing objectives with associated rules. + + This table supports: + - Displaying objectives along with a rule (e.g., MINIMIZE or MAXIMIZE). + - Toggling objectives via checkboxes. + - Batch updating and filtering based on the check status. + - Internal drag and drop to reorder rows. + - External drag and drop of text to add new objectives. + + Attributes + ---------- + all_objectives : List[Dict[str, str]] + The complete list of objectives. Each objective is a dictionary mapping + the objective name to its rule. + objectives : List[Dict[str, str]] + The currently displayed list of objectives. + selected : Dict[str, bool] + A dictionary tracking the selected (checked) status of each objective. + rules : Dict[str, str] + A dictionary tracking the rule associated with each objective. + checked_only : bool + Flag indicating whether only checked objectives should be displayed. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """ + Initialize the ObjectiveTable widget. + + Parameters + ---------- + *args : Any + Variable length argument list for QTableWidget. + **kwargs : Any + Arbitrary keyword arguments for QTableWidget. + """ + super().__init__(*args, **kwargs) + + # Enable row reordering via internal drag and drop. + self.setSelectionBehavior(QAbstractItemView.SelectRows) + self.setSelectionMode(QAbstractItemView.SingleSelection) + self.setShowGrid(False) + self.setDragDropMode(QAbstractItemView.InternalMove) + self.setDragDropOverwriteMode(False) + self.setAcceptDrops(True) + + self.setRowCount(0) + self.setColumnCount(3) + self.setAlternatingRowColors(True) + self.setStyleSheet("alternate-background-color: #262E38;") + self.setEditTriggers(QAbstractItemView.NoEditTriggers) + + self.verticalHeader().setVisible(False) + header = self.horizontalHeader() + header.setSectionResizeMode(0, QHeaderView.Fixed) + header.setSectionResizeMode(1, QHeaderView.Stretch) + self.setColumnWidth(0, 20) + self.setColumnWidth(2, 192) + + self.all_objectives: List[Dict[str, str]] = [] + self.objectives: List[Dict[str, str]] = [] + self.selected: Dict[str, bool] = {} # Track objective selected status. + self.rules: Dict[str, str] = {} # Track objective rules. + self.checked_only: bool = False + + self.config_logic() + + def config_logic(self) -> None: + """ + Configure signal connections and internal logic. + """ + self.horizontalHeader().sectionClicked.connect(self.header_clicked) + + def dragEnterEvent(self, event: QDragEnterEvent) -> None: + """ + Accept the drag event if it contains text or originates from within the table. + + Parameters + ---------- + event : QDragEnterEvent + The drag enter event. + """ + # Accept internal moves (reordering) or external text drops. + if event.source() == self or event.mimeData().hasText(): + event.acceptProposedAction() + else: + event.ignore() + + def dragMoveEvent(self, event: QDragMoveEvent) -> None: + """ + Continue accepting the drag move event if it contains text or is internal. + + Parameters + ---------- + event : QDragMoveEvent + The drag move event. + """ + if event.source() == self or event.mimeData().hasText(): + event.acceptProposedAction() + else: + event.ignore() + + def dropEvent(self, event: QDropEvent) -> None: + """ + Handle drop events. + + If the drop originates from within the table (i.e. internal move), + the default row reordering behavior is used. If the drop is external + and contains text, the text is parsed to create new objectives. + Each dropped line is interpreted as an objective name, with an optional + tab-delimited rule (defaulting to "MINIMIZE" if not provided). + + Parameters + ---------- + event : QDropEvent + The drop event. + """ + if event.source() == self: + # Internal move: allow default behavior for reordering rows. + super().dropEvent(event) + elif event.mimeData().hasText(): + text: str = event.mimeData().text() + lines = text.splitlines() + for line in lines: + parts = line.split("\t") + name = parts[0].strip() + # If a rule is provided, use it; otherwise, default to "MINIMIZE". + rule = parts[1].strip() if len(parts) > 1 else "MINIMIZE" + # Append the new objective. + self.all_objectives.append({name: rule}) + self.objectives = self.all_objectives + self.update_objectives(self.objectives, filtered=0) + event.acceptProposedAction() + else: + event.ignore() + + def is_all_checked(self) -> bool: + """ + Check if all objectives are selected. + + Returns + ------- + bool + True if every objective's checkbox is checked, False otherwise. + """ + for i in range(self.rowCount()): + widget = self.cellWidget(i, 0) + if widget is not None and not widget.isChecked(): + return False + return True + + def header_clicked(self, idx: int) -> None: + """ + Toggle the selection of all objectives when the first header is clicked. + + Parameters + ---------- + idx : int + The index of the clicked header section. This method only acts if idx is 0. + """ + if idx != 0: + return + + all_checked = self.is_all_checked() + + for i in range(self.rowCount()): + widget = self.cellWidget(i, 0) + if widget is not None: + widget.blockSignals(True) + widget.setChecked(not all_checked) + widget.blockSignals(False) + self.update_selected(0) + + def update_rules(self) -> None: + """ + Update the internal rules dictionary based on the current selections. + + Iterates through all rows and updates each objective's rule based on the value + of the combo box in the third column. + """ + for i in range(self.rowCount()): + name_item = self.item(i, 1) + if name_item is None: + continue + name = name_item.text() + rule_widget = self.cellWidget(i, 2) + if rule_widget is not None: + self.rules[name] = rule_widget.currentText() + + def set_rules(self, objectives: Dict[str, str]) -> None: + """ + Set the rules for objectives. + + Parameters + ---------- + objectives : Dict[str, str] + A dictionary mapping objective names to their corresponding rules. + """ + for name in objectives: + self.rules[name] = objectives[name] + self.update_objectives(self.objectives, filtered=2) + + def update_selected(self, _: int) -> None: + """ + Update the internal selected dictionary based on checkbox states. + + Parameters + ---------- + _ : int + A dummy parameter typically representing the checkbox state change. + """ + for i in range(self.rowCount()): + checkbox = self.cellWidget(i, 0) + name_item = self.item(i, 1) + if checkbox is not None and name_item is not None: + name = name_item.text() + self.selected[name] = checkbox.isChecked() + + if self.checked_only: + self.show_checked_only() + + def set_selected(self, objective_names: List[str]) -> None: + """ + Set the selected status for given objectives. + + Parameters + ---------- + objective_names : List[str] + A list of objective names to mark as selected. + """ + self.selected = {} + for name in objective_names: + self.selected[name] = True + + self.update_objectives(self.objectives, filtered=2) + + def toggle_show_mode(self, checked_only: bool) -> None: + """ + Toggle the display mode between showing all objectives and only checked ones. + + Parameters + ---------- + checked_only : bool + If True, display only checked objectives; otherwise, display all. + """ + self.checked_only = checked_only + if checked_only: + self.show_checked_only() + else: + self.show_all() + + def show_checked_only(self) -> None: + """ + Filter and display only the objectives that are checked. + """ + checked_objectives: List[Dict[str, str]] = [] + for obj in self.objectives: + name = next(iter(obj)) + if self.is_checked(name): + checked_objectives.append(obj) + self.update_objectives(checked_objectives, filtered=2) + + def show_all(self) -> None: + """ + Display all objectives. + """ + self.update_objectives(self.objectives, filtered=2) + + def is_checked(self, name: str) -> bool: + """ + Check if the given objective is selected. + + Parameters + ---------- + name : str + The name of the objective. + + Returns + ------- + bool + True if the objective is checked, False otherwise. + """ + try: + return self.selected[name] + except KeyError: + return False + + def update_objectives( + self, objectives: Optional[List[Dict[str, str]]], filtered: int = 0 + ) -> None: + """ + Update the table with the given objectives. + + The update behavior depends on the value of the filtered parameter: + - filtered = 0: Fully refresh the table. + - filtered = 1: Update based on a keyword filter. + - filtered = 2: Re-render based on the current check status. + + Parameters + ---------- + objectives : Optional[List[Dict[str, str]]] + A list of objectives to display. Each objective is a dictionary with a single + key-value pair mapping the objective name to its rule. + filtered : int, optional + The filter mode (0, 1, or 2), by default 0. + """ + self.setRowCount(0) + self.horizontalHeader().setVisible(False) + + if filtered == 0: + self.all_objectives = objectives or [] + self.objectives = self.all_objectives + self.selected = {} + self.rules = {} + for obj in self.objectives: + name = next(iter(obj)) + self.rules[name] = obj[name] + elif filtered == 1: + self.objectives = objectives or [] + + if not objectives: + return + + current_objectives: List[Dict[str, str]] = [] + if self.checked_only: + for obj in objectives: + name = next(iter(obj)) + if self.is_checked(name): + current_objectives.append(obj) + else: + current_objectives = objectives + + n = len(current_objectives) + self.setRowCount(n) + for i, obj in enumerate(current_objectives): + name = next(iter(obj)) + + # Create and set checkbox for objective selection. + checkbox = QCheckBox() + self.setCellWidget(i, 0, checkbox) + checkbox.setChecked(self.is_checked(name)) + checkbox.stateChanged.connect(self.update_selected) + + # Set the objective name. + self.setItem(i, 1, QTableWidgetItem(name)) + + # Create and set the rule combo box. + _rule = self.rules.get(name, "MINIMIZE") + cb_rule = QComboBox() + cb_rule.setItemDelegate(QStyledItemDelegate()) + cb_rule.addItems(["MINIMIZE", "MAXIMIZE"]) + cb_rule.setCurrentIndex(0 if _rule == "MINIMIZE" else 1) + cb_rule.currentIndexChanged.connect(self.update_rules) + self.setCellWidget(i, 2, cb_rule) + + self.setHorizontalHeaderLabels(["", "Name", "Rule"]) + self.setVerticalHeaderLabels([str(i) for i in range(n)]) + self.horizontalHeader().setVisible(True) + + def export_objectives(self) -> Dict[str, str]: + """ + Export the selected objectives along with their rules. + + Returns + ------- + Dict[str, str] + A dictionary mapping the name of each selected objective to its rule. + """ + objectives_exported: Dict[str, str] = {} + for obj in self.all_objectives: + name = next(iter(obj)) + if self.is_checked(name): + objectives_exported[name] = self.rules.get(name, "MINIMIZE") + return objectives_exported diff --git a/src/badger/gui/acr/components/routine_page.py b/src/badger/gui/acr/components/routine_page.py index 045a36f4..b5ef0e6f 100644 --- a/src/badger/gui/acr/components/routine_page.py +++ b/src/badger/gui/acr/components/routine_page.py @@ -35,6 +35,7 @@ from badger.gui.default.windows.add_random_dialog import BadgerAddRandomDialog from badger.gui.default.windows.message_dialog import BadgerScrollableMessageBox from badger.gui.default.utils import filter_generator_config +from badger.gui.acr.components.archive_search import ArchiveSearchWidget from badger.archive import update_run from badger.environment import instantiate_env from badger.errors import BadgerRoutineError @@ -214,6 +215,7 @@ def config_logic(self): self.generator_box.btn_edit_script.clicked.connect(self.edit_script) self.env_box.cb.currentIndexChanged.connect(self.select_env) self.env_box.btn_env_play.clicked.connect(self.open_playground) + self.env_box.btn_pv.clicked.connect(self.open_archive_search) self.env_box.btn_docs.clicked.connect(self.open_environment_docs) self.env_box.btn_add_var.clicked.connect(self.add_var) self.env_box.btn_lim_vrange.clicked.connect(self.limit_variable_ranges) @@ -673,6 +675,19 @@ def open_generator_docs(self): def open_environment_docs(self): self.window_env_docs.show() + def open_archive_search(self): + if not hasattr(self, "archive_search") or not self.archive_search.isVisible(): + try: + env = self.create_env() + except AttributeError: + raise BadgerRoutineError("No environment selected!") + + self.archive_search = ArchiveSearchWidget(environment=env) + self.archive_search.show() + else: + self.archive_search.raise_() + self.archive_search.activateWindow() + def add_var(self): # TODO: Use a cached env env_params = load_config(self.env_box.edit.toPlainText()) diff --git a/src/badger/gui/acr/components/var_table.py b/src/badger/gui/acr/components/var_table.py new file mode 100644 index 00000000..792217a8 --- /dev/null +++ b/src/badger/gui/acr/components/var_table.py @@ -0,0 +1,426 @@ +from PyQt5.QtWidgets import ( + QTableWidget, + QTableWidgetItem, + QHeaderView, + QCheckBox, + QMessageBox, + QAbstractItemView, +) +from PyQt5.QtCore import pyqtSignal, Qt +from PyQt5.QtGui import QColor +from badger.gui.default.components.robust_spinbox import RobustSpinBox + +from badger.environment import instantiate_env +from badger.errors import BadgerInterfaceChannelError + + +class VariableTable(QTableWidget): + sig_sel_changed = pyqtSignal() + sig_pv_added = pyqtSignal() + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.setAcceptDrops(True) + self.setDragEnabled(True) + self.setDragDropMode(QAbstractItemView.DragDrop) + self.setDefaultDropAction(Qt.MoveAction) + + # Reorder the rows by dragging around + # self.setSelectionBehavior(self.SelectRows) + # self.setSelectionMode(self.SingleSelection) + # self.setShowGrid(False) + # self.setDragDropMode(self.InternalMove) + # self.setDragDropOverwriteMode(False) + + self.setRowCount(0) + self.setColumnCount(4) + self.setAlternatingRowColors(True) + self.setStyleSheet("alternate-background-color: #262E38;") + # self.setEditTriggers(QAbstractItemView.NoEditTriggers) + + self.verticalHeader().setVisible(False) + header = self.horizontalHeader() + # header.setSectionResizeMode(0, QHeaderView.ResizeToContents) + header.setSectionResizeMode(0, QHeaderView.Fixed) + header.setSectionResizeMode(1, QHeaderView.Stretch) + # header.setSectionResizeMode(2, QHeaderView.ResizeToContents) + # header.setSectionResizeMode(3, QHeaderView.ResizeToContents) + self.setColumnWidth(0, 20) + self.setColumnWidth(2, 96) + self.setColumnWidth(3, 96) + + self.all_variables = [] # store all variables + self.variables = [] # store variables to be displayed + self.selected = {} # track var selected status + self.bounds = {} # track var bounds + self.checked_only = False + self.bounds_locked = False + self.addtl_vars = [] # track variables added on the fly + self.env_class = None # needed to get bounds on the fly + self.env = None # needed to get bounds on the fly + self.configs = None # needed to get bounds on the fly + self.previous_values = {} # to track changes in table + self.config_logic() + + def config_logic(self): + self.horizontalHeader().sectionClicked.connect(self.header_clicked) + # Catch if any item gets changed + self.itemChanged.connect(self.add_additional_variable) + + def setItem(self, row, column, item): + text = item.text() + if text != "Enter new PV here....": + self.previous_values[(row, column)] = item.text() + super().setItem(row, column, item) + + def is_all_checked(self): + for i in range(self.rowCount() - 1): + item = self.cellWidget(i, 0) + if not item.isChecked(): + return False + + return True + + def header_clicked(self, idx): + if idx: + return + + all_checked = self.is_all_checked() + + for i in range(self.rowCount() - 1): + item = self.cellWidget(i, 0) + # Doing batch update + item.blockSignals(True) + item.setChecked(not all_checked) + item.blockSignals(False) + self.update_selected(0) + + def update_bounds(self): + for i in range(self.rowCount() - 1): + name = self.item(i, 1).text() + sb_lower = self.cellWidget(i, 2) + sb_upper = self.cellWidget(i, 3) + self.bounds[name] = [sb_lower.value(), sb_upper.value()] + + def set_bounds(self, variables: dict, signal=True): + for name in variables: + self.bounds[name] = variables[name] + + if signal: + self.update_variables(self.variables, 2) + else: + self.update_variables(self.variables, 3) + + def update_selected(self, _): + for i in range(self.rowCount() - 1): + _cb = self.cellWidget(i, 0) + name = self.item(i, 1).text() + if name != "Enter new PV here....": # TODO: fix... + self.selected[name] = _cb.isChecked() + + self.sig_sel_changed.emit() + + if self.checked_only: + self.show_checked_only() + + def set_selected(self, variable_names): + self.selected = {} + for vname in variable_names: + self.selected[vname] = True + + self.update_variables(self.variables, 2) + + def toggle_show_mode(self, checked_only): + self.checked_only = checked_only + if checked_only: + self.show_checked_only() + else: + self.show_all() + + def show_checked_only(self): + checked_variables = [] + for var in self.variables: + name = next(iter(var)) + if self.is_checked(name): + checked_variables.append(var) + self.update_variables(checked_variables, 2) + + def show_all(self): + self.update_variables(self.variables, 2) + + def is_checked(self, name): + try: + _checked = self.selected[name] + except KeyError: + _checked = False + + return _checked + + def update_variables(self, variables, filtered=0): + # filtered = 0: completely refresh + # filtered = 1: filtered by keyword + # filtered = 2: just rerender based on check status + # filtered = 3: same as 2 but do not emit the signal + + self.setRowCount(0) + self.horizontalHeader().setVisible(False) + + if not filtered: + self.all_variables = variables or [] + self.variables = self.all_variables[:] # make a copy + self.selected = {} + self.bounds = {} + self.addtl_vars = [] + for var in self.variables: + name = next(iter(var)) + self.bounds[name] = var[name] + elif filtered == 1: + self.variables = variables or [] + + if not variables: + return + + _variables = [] + if self.checked_only: + for var in variables: + name = next(iter(var)) + if self.is_checked(name): + _variables.append(var) + else: + _variables = variables + + n = len(_variables) + 1 + self.setRowCount(n) + self.previous_values = {} # to track changes in table + + for i, var in enumerate(_variables): + name = next(iter(var)) + vrange = var[name] + + self.setCellWidget(i, 0, QCheckBox()) + + _cb = self.cellWidget(i, 0) + _cb.setChecked(self.is_checked(name)) + _cb.stateChanged.connect(self.update_selected) + item = QTableWidgetItem(name) + if name in self.addtl_vars: + # Make new PVs a different color + item.setForeground(QColor("darkCyan")) + else: + # Make non-new PVs not editable + item.setFlags(item.flags() & ~Qt.ItemIsEditable) + self.setItem(i, 1, item) + + _bounds = self.bounds[name] + sb_lower = RobustSpinBox( + default_value=_bounds[0], lower_bound=vrange[0], upper_bound=vrange[1] + ) + sb_lower.valueChanged.connect(self.update_bounds) + sb_upper = RobustSpinBox( + default_value=_bounds[1], lower_bound=vrange[0], upper_bound=vrange[1] + ) + sb_upper.valueChanged.connect(self.update_bounds) + self.setCellWidget(i, 2, sb_lower) + self.setCellWidget(i, 3, sb_upper) + + if self.bounds_locked: + sb_lower.setEnabled(False) + sb_upper.setEnabled(False) + else: + sb_lower.setEnabled(True) + sb_upper.setEnabled(True) + + # Make extra editable row + item = QTableWidgetItem("Enter new PV here....") + item.setFlags(item.flags() | Qt.ItemIsEditable) + item.setForeground(QColor("gray")) + self.setItem(n - 1, 1, item) + + self.setHorizontalHeaderLabels(["", "Name", "Min", "Max"]) + self.setVerticalHeaderLabels([str(i) for i in range(n)]) + + header = self.horizontalHeader() + # header.setSectionResizeMode(0, QHeaderView.Interactive) + header.setVisible(True) + + if filtered != 3: + self.sig_sel_changed.emit() + + def add_additional_variable(self, item): + row = idx = item.row() + column = item.column() + name = item.text() + + if ( + row != self.rowCount() - 1 + and column == 1 + and name != "Enter new PV here...." + ): + # check that the new text is not equal to the previous value at that cell + prev_name = self.previous_values.get((row, column), "") + if name == prev_name: + return + else: + # delete row and additional variable + self.removeRow(row) + self.addtl_vars.remove(prev_name) + self.variables = [ + var for var in self.variables if next(iter(var)) != prev_name + ] + del self.bounds[prev_name] + del self.selected[prev_name] + + self.update_variables(self.variables, 2) + return + + if ( + row == self.rowCount() - 1 + and column == 1 + and name != "Enter new PV here...." + ): + # Check that variables doesn't already exist in table + if name in [list(d.keys())[0] for d in self.variables]: + self.update_variables(self.variables, 2) + QMessageBox.warning( + self, "Variable already exists!", f"Variable {name} already exists!" + ) + return + + # Get bounds from interface, if PV exists on interface + _bounds = None + if self.env_class is not None: + try: + _, _bounds = self.get_bounds(name) + vrange = _bounds + except BadgerInterfaceChannelError: + # Raised when PV does not exist after attempting to call value + # Revert table to previous state + self.update_variables(self.variables, 2) + QMessageBox.critical( + self, + "Variable Not Found!", + f"Variable {name} cannot be found through the interface!", + ) + return + # except Exception as e: # TODO: fix this + # print(e) + # # Raised when PV exists but value/hard limits cannot be found + # # Set to some default values + # _bounds = vrange = [-1, 1] + # QMessageBox.warning(self, 'Bounds could not be found!', + # f'Variable {name} bounds could not be found.' + + # 'Please check default values!' + # ) + else: + # TODO: handle this case? Right now I don't think it should happen + raise "Environment cannot be found for new variable bounds!" + + # Add checkbox only when a PV is entered + self.setCellWidget(idx, 0, QCheckBox()) + + _cb = self.cellWidget(idx, 0) + + # Checked by default when entered + _cb.setChecked(True) + self.selected[name] = True + + _cb.stateChanged.connect(self.update_selected) + + sb_lower = RobustSpinBox( + default_value=_bounds[0], lower_bound=vrange[0], upper_bound=vrange[1] + ) + sb_lower.valueChanged.connect(self.update_bounds) + sb_upper = RobustSpinBox( + default_value=_bounds[1], lower_bound=vrange[0], upper_bound=vrange[1] + ) + sb_upper.valueChanged.connect(self.update_bounds) + self.setCellWidget(idx, 2, sb_lower) + self.setCellWidget(idx, 3, sb_upper) + + self.add_variable(name, vrange[0], vrange[1]) + self.addtl_vars.append(name) + + self.update_variables(self.variables, 2) + + self.sig_pv_added.emit() # notify the routine page that a new PV has been added + + def get_bounds(self, name): + # TODO: move elsewhere? + self.env = instantiate_env(self.env_class, self.configs) + + value = self.env.get_variable(name) + bound = self.env._get_bounds([name])[name] + return value, bound + + def add_variable(self, name, lb, ub): + var = {name: [lb, ub]} + + self.all_variables.append(var) + self.variables.append(var) + self.bounds[name] = [lb, ub] + + def export_variables(self) -> dict: + variables_exported = {} + for var in self.all_variables: + name = next(iter(var)) + if self.is_checked(name): + variables_exported[name] = self.bounds[name] + + return variables_exported + + def lock_bounds(self): + self.bounds_locked = True + self.toggle_show_mode(self.checked_only) + + def unlock_bounds(self): + self.bounds_locked = False + self.toggle_show_mode(self.checked_only) + + def dragEnterEvent(self, event): + if event.mimeData().hasText(): + event.acceptProposedAction() + else: + event.ignore() + + def dragMoveEvent(self, event): + if event.mimeData().hasText(): + event.acceptProposedAction() + else: + event.ignore() + + def dropEvent(self, event): + if event.mimeData().hasText(): + text = event.mimeData().text() + strings = text.strip().split("\n") + + position = event.pos() + drop_row = self.rowAt(position.y()) + if drop_row == -1: + drop_row = self.rowCount() + + for i, string in enumerate(strings): + string = string.strip() + if not string: + continue + + row = drop_row + i + if row >= self.rowCount(): + self.insertRow(row) + + item = QTableWidgetItem(string) + self.setItem(row, 1, item) + + self.add_additional_variable(item) + + event.acceptProposedAction() + else: + event.ignore() + + def flags(self, index): + if not index.isValid(): + return Qt.ItemIsEnabled + flags = Qt.ItemIsSelectable | Qt.ItemIsEnabled + if index.column() == 1: + flags |= Qt.ItemIsEditable | Qt.ItemIsDropEnabled + return flags