From de53882ead6b057c99cfc6e9759a63467950a0e4 Mon Sep 17 00:00:00 2001 From: Kerem Date: Sat, 22 Nov 2025 10:46:50 +0100 Subject: [PATCH] i2c command added data logger ui is much more condncsed, redy for decoding --- configure_session_widget.py | 75 +++- database/database/ehinge.db | Bin 98304 -> 0 bytes database/ehinge.db | Bin 106496 -> 106496 bytes database/init_database.py | 22 +- i2c_old/i2c_command_editor.py | 76 ++++ i2c_old/i2c_logger_ui.py | 331 ++++++++++++++++++ i2c_old/i2c_logic.py | 639 ++++++++++++++++++++++++++++++++++ i2c_old/i2c_polling.py | 151 ++++++++ i2c_old/i2c_ui.py | 342 ++++++++++++++++++ run.py | 224 +++++++++++- session.py | 237 +++++++++---- session_widget.py | 59 +++- 12 files changed, 2038 insertions(+), 118 deletions(-) delete mode 100644 database/database/ehinge.db create mode 100644 i2c_old/i2c_command_editor.py create mode 100644 i2c_old/i2c_logger_ui.py create mode 100644 i2c_old/i2c_logic.py create mode 100644 i2c_old/i2c_polling.py create mode 100644 i2c_old/i2c_ui.py diff --git a/configure_session_widget.py b/configure_session_widget.py index 065b9a8..f508de3 100644 --- a/configure_session_widget.py +++ b/configure_session_widget.py @@ -98,19 +98,28 @@ class ConfigureSessionWidget(QWidget): # Add command section add_group = QGroupBox("Add Command") add_layout = QVBoxLayout() - + add_form = QFormLayout() + + # Command type selector + self.cmd_type_combo = QComboBox() + self.cmd_type_combo.addItem("UART", "uart") + self.cmd_type_combo.addItem("I2C", "i2c") + self.cmd_type_combo.currentIndexChanged.connect(self._on_command_type_changed) + add_form.addRow("Command Type:", self.cmd_type_combo) + + # Command selector (will be populated based on type) self.cmd_combo = QComboBox() - add_form.addRow("UART Command:", self.cmd_combo) - + add_form.addRow("Command:", self.cmd_combo) + self.delay_spin = QSpinBox() self.delay_spin.setRange(0, 60000) self.delay_spin.setValue(3000) self.delay_spin.setSuffix(" ms") add_form.addRow("Delay After:", self.delay_spin) - + add_layout.addLayout(add_form) - + btn_add_cmd = QPushButton("➕ Add to Sequence") btn_add_cmd.clicked.connect(self._on_add_command) add_layout.addWidget(btn_add_cmd) @@ -156,6 +165,14 @@ class ConfigureSessionWidget(QWidget): widget.setLayout(layout) return widget + def _on_command_type_changed(self): + """Load commands when type changes.""" + cmd_type = self.cmd_type_combo.currentData() + if cmd_type == "uart": + self._load_uart_commands() + else: + self._load_i2c_commands() + def _load_uart_commands(self): """Load UART commands into dropdown.""" self.cmd_combo.clear() @@ -164,6 +181,15 @@ class ConfigureSessionWidget(QWidget): ) for row in cursor.fetchall(): self.cmd_combo.addItem(row[1], row[0]) + + def _load_i2c_commands(self): + """Load I2C commands into dropdown.""" + self.cmd_combo.clear() + cursor = self.db_conn.execute( + "SELECT command_id, command_name FROM i2c_commands WHERE is_active=1 ORDER BY command_name" + ) + for row in cursor.fetchall(): + self.cmd_combo.addItem(row[1], row[0]) def _load_profiles(self): """Load profiles from database.""" @@ -197,13 +223,22 @@ class ConfigureSessionWidget(QWidget): self.command_sequence = [] for cmd in json_obj.get("commands", []): cmd_id = cmd["command_id"] - # Get command name from database - c = self.db_conn.execute( - "SELECT command_name FROM uart_commands WHERE command_id = ?", (cmd_id,) - ) + cmd_type = cmd.get("command_type", "uart") # Default to uart for backwards compatibility + + # Get command name from correct table based on type + if cmd_type == "i2c": + c = self.db_conn.execute( + "SELECT command_name FROM i2c_commands WHERE command_id = ?", (cmd_id,) + ) + else: + c = self.db_conn.execute( + "SELECT command_name FROM uart_commands WHERE command_id = ?", (cmd_id,) + ) + cmd_row = c.fetchone() if cmd_row: self.command_sequence.append({ + "command_type": cmd_type, "command_id": cmd_id, "command_name": cmd_row[0], "delay_ms": cmd["delay_ms"] @@ -220,25 +255,29 @@ class ConfigureSessionWidget(QWidget): """Update sequence list display.""" self.sequence_list.clear() for i, cmd in enumerate(self.command_sequence, 1): - text = f"{i}. {cmd['command_name']} ({cmd['delay_ms']}ms)" + cmd_type = cmd.get('command_type', 'uart') + type_label = "UART" if cmd_type == "uart" else "I2C" + text = f"{i}. [{type_label}] {cmd['command_name']} ({cmd['delay_ms']}ms)" self.sequence_list.addItem(text) - + def _on_add_command(self): """Add command to sequence.""" if self.cmd_combo.count() == 0: - QMessageBox.warning(self, "Error", "No UART commands available") + QMessageBox.warning(self, "Error", "No commands available") return - + + cmd_type = self.cmd_type_combo.currentData() cmd_id = self.cmd_combo.currentData() cmd_name = self.cmd_combo.currentText() delay = self.delay_spin.value() - + self.command_sequence.append({ + "command_type": cmd_type, "command_id": cmd_id, "command_name": cmd_name, "delay_ms": delay }) - + self._update_sequence_list() self.btn_save.setEnabled(True) @@ -339,7 +378,11 @@ class ConfigureSessionWidget(QWidget): # Build JSON json_obj = { "commands": [ - {"command_id": cmd["command_id"], "delay_ms": cmd["delay_ms"]} + { + "command_id": cmd["command_id"], + "command_type": cmd.get("command_type", "uart"), + "delay_ms": cmd["delay_ms"] + } for cmd in self.command_sequence ] } diff --git a/database/database/ehinge.db b/database/database/ehinge.db deleted file mode 100644 index e48a96a2595b099a24dcc68e2aee7115520a2d47..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 98304 zcmeI#yGp}Q9Khj|TI)5oTCI*56{U+WV3lqT-WDgR5<&1nwJyG#v#YDVfiK~|V3!KI ziNlw}kCWsiIq+>cKHR?=l*Q%U{Y`IBY^O%bvb0kaDWzt#_Ejt9Vby4KWTTTG=4NU? z_49>x%JTP=zYj;{qwDfR`-s&os({H(%Ieb7Guw@-}Y|Gt$1@IrR?>WO)V23K!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PJ`8Kt9~9ky8Dw{!)MW)2(erfB*pk1PBly zK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF z5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk z1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs z0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZ zfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&U zAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009C7 z2oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+009Eu1yWm%So28y-Fk)A=+4*4*6PrFao4B*=y?67QWRC5)&Oorr+pgyvzbjs`Dl`&YJGo o&$ye9mA{XH{{w#?{{sGl{5Ll?*6<4nGOIJ91RLA*1@jpb0B_FW{Qv*} delta 2532 zcmbuAPiP!f9LML){+*pa&tM?hK(lGVSZLb!XZ~bokeJqptAeFy!O(Uq$%q@mG9Ol8c~+p7c=KUWD3GL@0Rk)b}R4Gn?I=t$$$Wu)puj z@AH0t-{qa;<(=fl_()L@gnjJEut#LiMEx*L6on&;pt&?P3lf{_v#)^zn|EfPjg6#l zZ>F9<*sL|vt=lghN+n0MEkWCADJM3x`U$zUb+#~f07`iQehpvLe$}d4O#MV1RlZQN z^0)Hg%y#CObVJHFYX>E-2B7S`F)2u30#I6UE0tpDVu2uHU`$W}Q{5zb{$g>pFkf{S zFRi+T7u@M0|G)Pg%KuJdJWY_eSh}!s-d%N<(xEYniLIM9(f67>W@A`Qb{uvFDdG%N z*K-|w7nTw{6S@niVrb)3s@r<5&t#B_%AnpbDk0W?rxKf3Jyd4ORg^uUdVTq(grNUL zMg3G#m1SpsP^jK+m61*{e%-CC7FNoow~Oc9uIh5F%jdu}qnxc%oBQfCLAGtutC=(M=T8FA9xq-Lk{v zaI|kCxSDnPBX))3mdmAbr=4~Y6xl{sNqDY(QzbE+im~294WX?6*@A0# zEy@j`VqVek4Nmn<3vtEQy#4L&CuZ%b-oW!tt^l+k!YW9_AU-;qmy+ac~i^qV4*|^fZv2v4P<% z^CiHJZ5ZaeZrQysBwfrs`*lUm_kt%#rq%B4~y0cR+Sh z{z_26aX~u*e^l?t_tbBsm!+puPsFZ@e~TsX6FA4tu&SoUjvfV9CR!(d zyCrVEdiQfFsielHrocz9wEn#N4q&E6>+FM%#W?Gg`Fd@xb?=Y;pDA%=S%7(ES-q)! z*Q_ll+so_V6yWbXRi7Je+`IyQk!s@!Nx*7tu2EbAwDE8a9BTZv21Xjk*1`S^<}*u> ejcFtQDfl57ht~x7a9h~`AHe#?Xye+K;M^ndVRu^q diff --git a/database/init_database.py b/database/init_database.py index bbc3fcc..6c8dfa2 100644 --- a/database/init_database.py +++ b/database/init_database.py @@ -152,18 +152,19 @@ CREATE TABLE IF NOT EXISTS "sessions" ( CREATE TABLE IF NOT EXISTS "telemetry_raw" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, - + "session_id" TEXT NOT NULL, "session_name" TEXT NOT NULL, "run_no" INTEGER NOT NULL, "run_command_id" INTEGER NOT NULL, - + "t_ns" INTEGER NOT NULL, "time_ms" REAL, - + "uart_raw_packet" BLOB, "i2c_raw_bytes" BLOB, - + "i2c_zero_ref" INTEGER DEFAULT 0, + FOREIGN KEY ("session_id") REFERENCES "sessions"("session_id"), FOREIGN KEY ("run_command_id") REFERENCES "uart_commands"("command_id") ); @@ -174,33 +175,34 @@ CREATE TABLE IF NOT EXISTS "telemetry_raw" ( CREATE TABLE IF NOT EXISTS "telemetry_decoded" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, - + "session_id" TEXT NOT NULL, "session_name" TEXT NOT NULL, "run_no" INTEGER NOT NULL, "run_command_id" INTEGER NOT NULL, - + "t_ns" INTEGER NOT NULL, "time_ms" REAL, - + -- UART decoded "motor_current" INTEGER, "encoder_value" INTEGER, "relative_encoder_value" INTEGER, "v24_pec_diff" INTEGER, "pwm" INTEGER, - + -- I2C decoded "i2c_raw14" INTEGER, "i2c_zero_raw14" INTEGER, "i2c_delta_raw14" INTEGER, "i2c_angle_deg" REAL, "i2c_zero_angle_deg" REAL, - + "i2c_zero_ref" INTEGER DEFAULT 0, + -- Derived "angular_velocity" REAL, "angular_acceleration" REAL, - + FOREIGN KEY ("session_id") REFERENCES "sessions"("session_id"), FOREIGN KEY ("run_command_id") REFERENCES "uart_commands"("command_id") ); diff --git a/i2c_old/i2c_command_editor.py b/i2c_old/i2c_command_editor.py new file mode 100644 index 0000000..f333b9c --- /dev/null +++ b/i2c_old/i2c_command_editor.py @@ -0,0 +1,76 @@ +# File: components/i2c/i2c_command_editor.py + +from PyQt6.QtWidgets import ( + QDialog, + QVBoxLayout, + QHBoxLayout, + QLineEdit, + QPushButton, + QFormLayout, + QComboBox, + QSpinBox, +) +from PyQt6.QtCore import Qt + + +class I2CCommandEditorDialog(QDialog): + def __init__(self, command=None): + super().__init__() + + self.setWindowTitle("Modify I2C Command" if command else "Add I2C Command") + + form_layout = QFormLayout() + form_layout.setLabelAlignment(Qt.AlignmentFlag.AlignRight) + form_layout.setFormAlignment(Qt.AlignmentFlag.AlignTop) + form_layout.setSpacing(10) + + # === Input Fields === + self.name_input = QLineEdit() + self.description_input = QLineEdit() + self.category_input = QLineEdit() + self.operation_input = QComboBox() + self.operation_input.addItems(["read", "write"]) + self.register_input = QSpinBox() + self.register_input.setRange(0, 255) + self.hex_input = QLineEdit() + self.hex_input.setPlaceholderText("0x01,0x02") + + if command: + self.name_input.setText(command.get("name", "")) + self.description_input.setText(command.get("description", "")) + self.category_input.setText(command.get("category", "")) + self.operation_input.setCurrentText(command.get("operation", "read")) + self.register_input.setValue(command.get("register", 0)) + self.hex_input.setText(command.get("hex_string", "")) + + form_layout.addRow("Name:", self.name_input) + form_layout.addRow("Description:", self.description_input) + form_layout.addRow("Category:", self.category_input) + form_layout.addRow("Operation:", self.operation_input) + form_layout.addRow("Register:", self.register_input) + form_layout.addRow("Hex:", self.hex_input) + + button_layout = QHBoxLayout() + self.ok_button = QPushButton("OK") + self.cancel_button = QPushButton("Cancel") + self.ok_button.clicked.connect(self.accept) + self.cancel_button.clicked.connect(self.reject) + button_layout.addStretch() + button_layout.addWidget(self.ok_button) + button_layout.addWidget(self.cancel_button) + + layout = QVBoxLayout(self) + layout.addLayout(form_layout) + layout.addLayout(button_layout) + + self.setMinimumWidth(400) + + def get_data(self): + return { + "name": self.name_input.text().strip(), + "description": self.description_input.text().strip(), + "category": self.category_input.text().strip(), + "operation": self.operation_input.currentText(), + "register": self.register_input.value(), + "hex_string": self.hex_input.text().strip(), + } diff --git a/i2c_old/i2c_logger_ui.py b/i2c_old/i2c_logger_ui.py new file mode 100644 index 0000000..0f9beea --- /dev/null +++ b/i2c_old/i2c_logger_ui.py @@ -0,0 +1,331 @@ +# components/i2c/i2c_logger_ui.py +# +from PyQt6.QtWidgets import ( + QWidget, + QVBoxLayout, + QHBoxLayout, + QPushButton, + QComboBox, + QLineEdit, + QSplitter, +) +from PyQt6.QtCore import QObject, pyqtSignal +from PyQt6.QtCore import Qt + + +from components.console.console_ui import console_widget +from components.console.console_registry import log_main_console +import components.items.elements as elements +import config.config as config + +from components.i2c.i2c_logic import I2CLogic, _coerce_int + + +class _SafeConsoleProxy(QObject): + log_signal = pyqtSignal(str, str) # level, message + + def __init__(self, console): + super().__init__() + self.log_signal.connect(console.log) + + def __call__(self, level, msg): + self.log_signal.emit(level, msg) + + +class I2CLoggerWidget(QWidget): + """Drop-in single-page I2C widget with table | console splitter.""" + + def __init__(self, parent=None): + super().__init__(parent) + self.i2c_logic = I2CLogic() + self.commands = self.i2c_logic.get_predefined_commands() + self.comboboxes = {} + self.connection_status = False + self.init_ui() + + def init_ui(self): + # Top controls + top_controls = QWidget() + top_controls_layout = QHBoxLayout(top_controls) + top_controls_layout.setContentsMargins(0, 0, 0, 0) + top_controls_layout.setSpacing(12) + top_controls_layout.setAlignment(Qt.AlignmentFlag.AlignTop) + # Port + self.comboboxes["port"] = QComboBox() + self.comboboxes["port"].addItems(self.i2c_logic.get_channels()) + top_controls_layout.addWidget( + elements.label_and_widget("Port", self.comboboxes["port"]) + ) + + self.button_refresh_channels = elements.create_icon_button( + config.REFRESH_BUTTON_ICON_LINK, icon_size=30, border_size=4 + ) + top_controls_layout.addWidget(self.button_refresh_channels) + + # Address (empty until connected → scan) + self.comboboxes["address"] = QComboBox() + top_controls_layout.addWidget( + elements.label_and_widget("Address", self.comboboxes["address"]) + ) + + self.button_refresh_address = elements.create_icon_button( + config.REFRESH_BUTTON_ICON_LINK, icon_size=30, border_size=4 + ) + top_controls_layout.addWidget(self.button_refresh_address) + + self.input_register = QLineEdit() + self.input_register.setPlaceholderText("0x04") + top_controls_layout.addWidget( + elements.label_and_widget("Register", self.input_register) + ) + + self.input_length = QLineEdit() + self.input_length.setPlaceholderText("1") + top_controls_layout.addWidget( + elements.label_and_widget("Length", self.input_length) + ) + + # Set Zero / Connect / start / Disconnect + self.button_set_zero = QPushButton("Set Zero") + top_controls_layout.addWidget( + elements.label_and_widget("", self.button_set_zero) + ) + + self.button_connect = QPushButton("Connect") + top_controls_layout.addWidget( + elements.label_and_widget("", self.button_connect) + ) + + self.button_start = QPushButton("Start") + top_controls_layout.addWidget(elements.label_and_widget("", self.button_start)) + + self.button_disconnect = QPushButton("Disconnect") + top_controls_layout.addWidget( + elements.label_and_widget("", self.button_disconnect) + ) + + col1_widget = QWidget() + col1_layout = QVBoxLayout(col1_widget) + # col1_layout.setContentsMargins(0, 0, 0, 0) + col1_layout.setSpacing(4) + + # Console + self.console = console_widget() + + # IMPORTANT: keep a strong reference so it isn’t GC’d + self._i2c_console_proxy = _SafeConsoleProxy(self.console) + self.i2c_logic.set_logger(self._i2c_console_proxy) + + # === Console only === + console_stack_widget = QWidget() + console_stack_layout = QVBoxLayout(console_stack_widget) + console_stack_layout.setContentsMargins(0, 0, 0, 0) + console_stack_layout.setSpacing(4) + console_stack_layout.addWidget(self.console) + + # === Splitter (kept for consistency, only console pane) === + splitter = QSplitter(Qt.Orientation.Horizontal) + splitter.addWidget(console_stack_widget) + splitter.setStretchFactor(0, 1) + + # === Main Layout === + main_layout = QVBoxLayout() + main_layout.setContentsMargins(4, 4, 4, 4) + main_layout.setSpacing(6) + main_layout.addWidget(top_controls) + main_layout.addWidget(splitter, stretch=1) + self.setLayout(main_layout) + + # Signals + self.button_refresh_channels.clicked.connect(self.refresh_channels) + self.button_refresh_address.clicked.connect(self.refresh_address) + self.button_set_zero.clicked.connect(self.set_zero) + self.button_connect.clicked.connect(self.connect) + self.button_start.clicked.connect(self.start_logging) + self.button_disconnect.clicked.connect(self.disconnect) + + self.disconnected_enable_status() + + # --- UI state toggles --- + def disconnected_enable_status(self): + elements.set_enabled_state(True, self.button_refresh_channels, grayOut=False) + elements.set_enabled_state(True, self.comboboxes["port"], grayOut=False) + elements.set_enabled_state(False, self.button_disconnect, grayOut=True) + elements.set_enabled_state(True, self.button_connect, grayOut=False) + elements.set_enabled_state(False, self.button_refresh_address, grayOut=True) + elements.set_enabled_state(False, self.comboboxes["address"], grayOut=True) + elements.set_enabled_state(False, self.input_register, grayOut=True) + elements.set_enabled_state(False, self.button_start, grayOut=True) + elements.set_enabled_state(False, self.input_length, grayOut=True) + + def connected_enable_status(self): + elements.set_enabled_state(False, self.button_refresh_channels, grayOut=True) + elements.set_enabled_state(False, self.comboboxes["port"], grayOut=True) + elements.set_enabled_state(True, self.button_disconnect, grayOut=False) + elements.set_enabled_state(False, self.button_connect, grayOut=True) + elements.set_enabled_state(True, self.button_refresh_address, grayOut=False) + elements.set_enabled_state(True, self.comboboxes["address"], grayOut=False) + elements.set_enabled_state(True, self.input_register, grayOut=False) + elements.set_enabled_state(True, self.button_start, grayOut=False) + elements.set_enabled_state(True, self.input_length, grayOut=False) + + def started_enable_status(self): + elements.set_enabled_state(False, self.button_refresh_channels, grayOut=True) + elements.set_enabled_state(False, self.comboboxes["port"], grayOut=True) + elements.set_enabled_state(True, self.button_disconnect, grayOut=False) + elements.set_enabled_state(False, self.button_connect, grayOut=True) + elements.set_enabled_state(False, self.button_refresh_address, grayOut=True) + elements.set_enabled_state(False, self.comboboxes["address"], grayOut=True) + elements.set_enabled_state(False, self.input_register, grayOut=True) + elements.set_enabled_state(False, self.button_start, grayOut=True) + elements.set_enabled_state(False, self.input_length, grayOut=True) + + # --- Buttons --- + def set_zero(self): + log_main_console("info", "Stting Zero") + self.i2c_logic.measure_zero_raw14() + + def connect(self): + log_main_console("info", "🔗 Connecting...") + success = self.i2c_logic.connect(self.comboboxes["port"].currentText()) + if success: + self.connected_enable_status() + self.refresh_address(silent=True) + self.connection_status = True + else: + elements.flash_button( + self.button_connect, flash_style="background-color: red;" + ) + + def start_logging(self): + # use the combobox address and user fields (reg, length) + addr = self._current_address_int() + reg_txt = (self.input_register.text() or "").strip() + len_txt = (self.input_length.text() or "").strip() or "1" + + def _to_int_any(s, default=None): + try: + return int(str(s), 0) + except Exception: + return default + + reg = _to_int_any(reg_txt) + length = _to_int_any(len_txt, default=1) + if reg is None or length is None or length <= 0: + self.console.log("warning", "⚠️ Invalid register or length") + return + + # start the logic-side logger (read-only continuous) + if self.i2c_logic.start_logger( + device_address=addr, reg=reg, length=length, interval_ms=100 + ): + self.started_enable_status() + else: + elements.flash_button( + self.button_start, flash_style="background-color: red;" + ) + + def disconnect(self): + log_main_console("info", "🔌 Disconnecting...") + if self.i2c_logic.disconnect(): + self.disconnected_enable_status() + self.connection_status = False + self.refresh_channels(silent=True) + else: + elements.flash_button( + self.button_disconnect, flash_style="background-color: red;" + ) + + def refresh_channels(self, silent: bool = False): + log_main_console("info", "🔄 Refreshing buses...") + self.comboboxes["port"].clear() + ports = self.i2c_logic.get_channels() + if ports: + self.comboboxes["port"].addItems(ports) + if not silent: + elements.flash_button(self.button_refresh_channels) + log_main_console("success", "🔄 Bus list refreshed") + else: + elements.flash_button( + self.button_refresh_channels, flash_style="background-color: red;" + ) + log_main_console("warn", "No I2C buses found") + + def refresh_address(self, silent: bool = False): + log_main_console("info", "🔄 Scanning bus for devices...") + self.comboboxes["address"].clear() + addresses = self.i2c_logic.scan_bus() + if addresses: + self.comboboxes["address"].addItems(addresses) + if not silent: + elements.flash_button(self.button_refresh_address) + log_main_console("success", "🔄 Bus scan complete") + else: + elements.flash_button( + self.button_refresh_address, flash_style="background-color: red;" + ) + log_main_console("info", "No devices detected") + + def get_current_config(self): + return {key: cb.currentText() for key, cb in self.comboboxes.items()} + + def _current_address_int(self) -> int: + txt = self.comboboxes["address"].currentText().strip() or "0x40" + return _coerce_int(txt) + + def send_command(self, command: dict): + self.i2c_logic.send_command(command, device_address=self._current_address_int()) + + def send_command_raw(self): + action = self.comboboxes["action"].currentText().strip().upper() + reg_txt = (self.input_register.text() or "").strip() + data_txt = (self.input_hex.text() or "").strip() + length_txt = (self.input_length.text() or "").strip() + + # --- helpers --- + def _to_int_any(s, default=None): + try: + return int(str(s), 0) # accepts "0xFE", "254" + except Exception: + return default + + def _split_tokens(s: str): + return [t for t in s.replace(",", " ").split() if t] + + # --- validate register --- + reg = _to_int_any(reg_txt) + if reg is None: + self.console.log("warning", "⚠️ Invalid or missing register (e.g. 0xFE).") + return + + cmd = { + "action": action.lower(), + "reg": reg, + } + + if action == "READ": + ln = _to_int_any(length_txt, default=1) + if ln is None or ln <= 0: + self.console.log("warning", "⚠️ Invalid length (must be >0).") + return + cmd["length"] = ln + + elif action == "WRITE": + toks = _split_tokens(data_txt) + if not toks: + self.console.log( + "warning", "⚠️ WRITE requires data (e.g. 00 or 01,02,03)." + ) + return + if len(toks) == 1: + v = _to_int_any(toks[0]) + if v is None: + self.console.log( + "warning", "⚠️ Invalid byte. Use hex/dec like 0x00 or 0." + ) + return + cmd["value"] = v + else: + cmd["bytes"] = data_txt # leave as string; logic will parse + + self.i2c_logic.send_command(cmd, device_address=self._current_address_int()) diff --git a/i2c_old/i2c_logic.py b/i2c_old/i2c_logic.py new file mode 100644 index 0000000..62f6aef --- /dev/null +++ b/i2c_old/i2c_logic.py @@ -0,0 +1,639 @@ +# ----------------------- +# File: components/i2c/i2c_logic.py +# +# Purpose +# ------- +# Unified I²C logic that supports: +# 1) One-shot command send under an exclusive "capture window" +# 2) A continuous background logger (thread) that politely backs off during capture +# +# Key Ideas +# --------- +# - We reuse the global coordinator (components.scheduler.coordinator) to mark an +# exclusive "capture window" while a one-shot I²C command is in flight. +# During this window, other sends are rejected, and the logger throttles +# output to keep the console readable and the UI snappy — same as UART. +# - The logger runs in its own daemon thread and performs periodic READs from a +# chosen register/length at a configurable interval. +# - All SMBus interactions are serialized by a single RLock so disconnect/close +# cannot race against in-flight reads/writes (prevents SIGSEGV). +# +# Tuning / Knobs +# -------------- +# - self._logger_sleep_s: how often the logger loop yields when idle/throttled +# (default ~50 Hz). Increase for less CPU, decrease for more responsiveness. +# - _throttle_ns (inside _logger_loop): during capture, the logger emits at most +# once every 50 ms. Increase to further reduce noise. +# - The default capture window in send_command() is ~300 ms; adjust if needed. +# +# Logging Format +# -------------- +# - TX (read): "➡️ | addr 0xAA | reg 0xRR | action READ | len N" +# - RX (n bytes): "⬅️ | addr 0xAA | reg 0xRR | data [0xV0, 0xV1, ...]" +# - TX (write1): "➡️ | addr 0xAA | reg 0xRR | action WRITE | value 0xVV" +# - TX (writeN): "➡️ | addr 0xAA | reg 0xRR | action WRITE | bytes [0x.., ...]" +# - OK (write): "✅ | addr 0xAA | reg 0xRR | status OK" +# - Errors: "❌ | addr 0xAA | reg 0xRR | error " +# +# ------------------------------------------------------------------------------ + +from __future__ import annotations + +import os +import time +import threading +from enum import Enum, auto +from typing import List, Optional, Union +import statistics +import smbus2 +import config.config as config + +from components.console.console_registry import log_main_console +from components.data.db import get_i2c_commands + +# Shared coordinator + timebase (UART-style exclusivity/throttling) +from components.scheduler.coordinator import ( + claim_i2c_capture, + release_i2c_capture, + i2c_capture_active, +) +from components.scheduler.timebase import now_ns + + +# --------------------------- +# Helpers (parsing and formatting) +# --------------------------- + + +def _coerce_int(val: Union[str, int], base: int = 10) -> int: + """Turn '1', '0x40', '64' or 1 into int. Raises ValueError on failure.""" + if isinstance(val, int): + return val + s = str(val).strip().lower() + if s.startswith("0x"): + return int(s, 16) + return int(s, base) + + +def _parse_hex_bytes_list(s: str) -> List[int]: + """ + Parse strings like '0x01,0x02,0x03' or '01 02 03' or '01, 02, 03' into a list of ints. + """ + if not s: + return [] + raw = [tok.strip() for tok in s.replace(",", " ").split()] + out = [] + for t in raw: + if not t: + continue + # accept '0x..' or plain hex/dec + out.append(int(t, 0)) + return out + + +# --------------------------- +# State Machine +# --------------------------- + + +class I2CState(Enum): + DISCONNECTED = auto() + CONNECTED_IDLE = auto() + LOGGING = auto() + CAPTURE_ACTIVE = auto() + + +# --------------------------- +# Main logic +# --------------------------- + + +class I2CLogic: + """ + Single class for: + - Connecting / disconnecting an I²C bus + - Sending one-shot register read/write commands under an exclusive capture window + - Running a continuous logger on the same bus (polls reg/length at an interval) + + Threading model + --------------- + - The logger runs in a background daemon thread started by start_logger(). + - All SMBus calls (read/write/close) are serialized by self._bus_lock (RLock). + - The command send path (send_command) runs in the UI/main thread, but it + briefly "claims" exclusivity via the shared coordinator so: + * Other sends are rejected during capture, + * The logger throttles its output during the capture window. + """ + + def __init__(self): + # Connection + self.bus: Optional[smbus2.SMBus] = None + self.bus_id: int = 1 # Raspberry Pi default /dev/i2c-1 + + # Threading & safety + self._bus_lock = threading.RLock() # serialize ALL SMBus I/O + self._logger_thread: Optional[threading.Thread] = None + self._logger_running: bool = False + self._logger_sleep_s: float = 0.05 # ~20 Hz idle yield + self._throttle_ns: int = 50_000_000 # 50 ms between emissions during capture + + # Logger configuration + self._log_addr: Optional[int] = None + self._log_reg: int = 0 + self._log_len: int = 1 + self._log_interval_s: float = 0.20 # default 200 ms between polls + self._shutting_down = False # prevent new work during disconnects + self._2_bytes_read_counter = ( + 0 # to cinfurm how many times we were in the read 2 bytes function + ) + # State + self._state = I2CState.DISCONNECTED + + # Logger injection (UI provides console logger) + self.log = lambda level, msg: None # default: drop logs until UI injects one + + # Inject the console logger (must be a callable: (type, message) -> None) + # components/i2c/i2c_logic.py + def set_logger(self, log_func): + if callable(log_func): + self.log = log_func + + # ----- convenience formatters ----- + def _fmt_hex(self, v: int, width: int = 2) -> str: + return f"0x{v:0{width}X}" + + def _fmt_bytes(self, data) -> str: + if data is None: + return "[]" + return "[" + ", ".join(f"0x{b:02X}" for b in data) + "]" + + def _to_int_any(self, v, default=None): + try: + return int(str(v).strip(), 0) # accepts "0xFE" or "254" + except Exception: + return default + + def _parse_hex_list(self, s: str) -> List[int]: + return _parse_hex_bytes_list(s) + + # --------------------------- + # State & Channels + # --------------------------- + + def state(self) -> I2CState: + return self._state + + def get_channels(self) -> List[str]: + """List available /dev/i2c-* buses as strings of bus ids.""" + i2c_devices = [] + try: + for entry in os.listdir("/dev"): + if entry.startswith("i2c-"): + try: + bus_id = int(entry.split("-")[1]) + i2c_devices.append(str(bus_id)) + except ValueError: + continue + except Exception as e: + self.log("warn", f"Could not list /dev for i2c buses: {e}") + return sorted(i2c_devices) + + def scan_bus(self) -> List[str]: + """ + Scan the currently opened bus for devices. + Returns like ['0x40', '0x41']. + """ + found: List[str] = [] + with self._bus_lock: + if self.bus is None: + return found + try: + for address in range(0x03, 0x78): # valid 7-bit range + try: + # write_quick is the classic probe; some devices NACK writes. + self.bus.write_quick(address) + found.append(f"0x{address:02X}") + except OSError: + continue + except Exception as e: + self.log("warn", f"Unexpected at 0x{address:02X}: {e}") + continue + self.log("info", f"Scan complete. Found devices: {found}") + except Exception as e: + self.log("error", f"Scan failed: {e}") + return found + + # --------------------------- + # Connection lifecycle + # --------------------------- + + def connect(self, port: Union[int, str]) -> bool: + """ + Open the I²C bus. 'port' can be an int or string ('1' -> /dev/i2c-1). + """ + if config.DEBUG_MODE: + self._state = I2CState.CONNECTED_IDLE + self.log("info", "DEBUG: simulate I²C connect") + self.log("success", f"🔗 Connected to I²C bus {port} (DEBUG)") + return True + + try: + self.bus_id = _coerce_int(port) + with self._bus_lock: + self.bus = smbus2.SMBus(self.bus_id) + self._state = I2CState.CONNECTED_IDLE + self.log("success", f"🔗 Connected to I²C bus {self.bus_id}") + return True + except FileNotFoundError: + self.log("error", f"I²C bus {port} not found (no /dev/i2c-{port}).") + except Exception as e: + self.log("error", f"❌ Connection failed: {e}") + + self._state = I2CState.DISCONNECTED + return False + + def disconnect(self) -> bool: + """ + Stop logger (join) and close the bus cleanly. + Safe to call even if already disconnected. + """ + if config.DEBUG_MODE: + self._state = I2CState.DISCONNECTED + self.log("info", "DEBUG: simulate I²C disconnect") + return True + + try: + # 1) Stop logger first so no thread touches the bus. + self.stop_logger() + + # 2) Close the bus under the same lock used for I/O. + with self._bus_lock: + if self.bus: + try: + self.bus.close() + except Exception: + pass + self.bus = None + + self._state = I2CState.DISCONNECTED + self.log("info", f"🔌 Disconnected from I²C bus {self.bus_id}") + return True + except Exception as e: + self.log("error", f"❌ Disconnection error: {e}") + return False + + # --------------------------- + # One-shot command + capture window (UART-style) + # --------------------------- + + def send_command(self, command, device_address=0x40): + """ + Execute a single I²C action under an exclusive capture window. + + command: either + - str (raw): "01,02,03" (block write to reg 0x00) + - dict: { + 'action'/'operation': 'read'|'write', + 'reg'/'register': int|str, + 'length': int (for read), + 'value': int (single-byte write), + 'bytes': '01,02,03' (block write string) + } + device_address: int or str ('0x40' accepted). If None, attempt to read + from command['address'] if present. + """ + if self._shutting_down: + self.log("warning", "⛔ I²C is shutting down; command ignored.") + return None + + # must be connected in real mode + if not config.DEBUG_MODE and not self.bus: + self.log("error", "❌ | bus not connected") + return None + + # Claim capture (exclusive) — reject if already in use + deadline_ns = now_ns() + 300_000_000 # ~300 ms default window + if not claim_i2c_capture(deadline_ns, owner="I2C_CMD"): + self.log("warning", "⛔ Busy: capture window active. Try again shortly.") + return None + + prev_state = self._state + self._state = I2CState.CAPTURE_ACTIVE + try: + # Normalize device address + if device_address is not None: + addr = self._to_int_any(device_address, default=0x40) + elif isinstance(command, dict) and "address" in command: + addr = self._to_int_any(command["address"], default=0x40) + else: + self.log("error", "❌ | no device address provided") + return None + + # RAW string mode (block write to reg 0x00) + if isinstance(command, str): + data = self._parse_hex_list(command) + self.log( + "info", + f"➡️ | addr {self._fmt_hex(addr)} | reg 0x00 | action WRITE | bytes {self._fmt_bytes(data)}", + ) + with self._bus_lock: + if not config.DEBUG_MODE: + self.bus.write_i2c_block_data(addr, 0x00, data) + self.log( + "success", f"✅ | addr {self._fmt_hex(addr)} | reg 0x00 | status OK" + ) + return True + + # Dict mode + action = ( + command.get("action") or command.get("operation") or "read" + ).upper() + reg = self._to_int_any( + command.get("reg", command.get("register", 0)), default=0 + ) + + if action == "READ": + length = int(command.get("length", 1)) + self.log( + "info", + f"➡️ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | action READ | len {length}", + ) + with self._bus_lock: + if not config.DEBUG_MODE: + if length <= 1: + data = [self.bus.read_byte_data(addr, reg)] + else: + data = self.bus.read_i2c_block_data(addr, reg, length) + else: + # synthetic data in DEBUG + data = [0xBE] if length <= 1 else list(range(length)) + self.log( + "success", + f"⬅️ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | data {self._fmt_bytes(data)}", + ) + return data if length > 1 else data[0] + + elif action == "WRITE": + if "bytes" in command and command["bytes"]: + data = self._parse_hex_list(command["bytes"]) + self.log( + "info", + f"➡️ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | action WRITE | bytes {self._fmt_bytes(data)}", + ) + with self._bus_lock: + if not config.DEBUG_MODE: + self.bus.write_i2c_block_data(addr, reg, data) + self.log( + "success", + f"✅ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | status OK", + ) + return True + else: + value = self._to_int_any(command.get("value", 0)) + self.log( + "info", + f"➡️ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | action WRITE | value {self._fmt_hex(value)}", + ) + with self._bus_lock: + if not config.DEBUG_MODE: + self.bus.write_byte_data(addr, reg, value) + self.log( + "success", + f"✅ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | status OK", + ) + return True + + else: + self.log( + "warning", + f"⚠️ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | unknown action '{action}'", + ) + return None + + except Exception as e: + self.log("error", f"❌ | error: {e}") + return None + finally: + release_i2c_capture() + # restore state (back to LOGGING if logger is on) + self._state = ( + I2CState.LOGGING if self._logger_running else I2CState.CONNECTED_IDLE + ) + + # --------------------------- + # Hot-path primitive (no logging, no waits) + # --------------------------- + def read_2_bytes(self, addr_7bit: int, reg: int) -> dict: + """ + Fast, on-demand I²C read of exactly 2 bytes from 'reg' at 7-bit address. + - Immediate-or-skip capture; no waits, no prints, no retries. + """ + # must be connected in real (non-debug) mode + if not config.DEBUG_MODE and self.bus is None: + return {"status": "ERR_NOT_CONNECTED", "addr": addr_7bit, "reg": reg} + + # Give the coordinator a tiny future window so claim is valid + deadline_ns = now_ns() + 5_000 + if not claim_i2c_capture(deadline_ns, owner="I2C_PKT"): + return {"status": "SKIPPED_BUSY", "addr": addr_7bit, "reg": reg} + + try: + with self._bus_lock: + data = self.bus.read_i2c_block_data(addr_7bit, reg, 2) + if not isinstance(data, list) or len(data) != 2: + return {"status": "ERR", "addr": addr_7bit, "reg": reg} + return { + "status": "OK", + "bytes": [data[0], data[1]], + } + except Exception: + return {"status": "ERR", "addr": addr_7bit, "reg": reg} + finally: + release_i2c_capture() + + def measure_zero_raw14(self, duration_ms: int = 200, interval_ms: int = 10): + """ + Measure zero for the I²C angle sensor. + Saves the result bytes into config.SESSION_ZERO = [MSB, LSB] (big-endian). + Returns the median raw14 (int) or None. + """ + samples = [] + deadline = time.time() + (duration_ms / 1000.0) + + try: + if self.connect(1): + self.log("info", "I²C auto-initialized on /dev/i2c-1 to set zero") + while time.time() < deadline: + rep = self.read_2_bytes( + 0x40, 0xFE + ) # dict: {"status":"OK","bytes":[b0,b1]} + if isinstance(rep, dict) and rep.get("status") == "OK": + by = rep.get("bytes") + if isinstance(by, list) and len(by) == 2: + raw16 = ((by[0] & 0xFF) << 8) | (by[1] & 0xFF) # big-endian + raw14 = raw16 & 0x3FFF + samples.append(raw14) + time.sleep(interval_ms / 1000.0) + + if not samples: + self.log("warn", "I²C zero measurement collected no valid samples") + return None + + med = int(statistics.median(samples)) + msb = (med >> 8) & 0xFF + lsb = med & 0xFF + + # Store exactly the two bytes + config.SESSION_ZERO = [msb, lsb] + + self.log( + "info", f"I²C zero raw14={med} -> SESSION_ZERO={msb:02X} {lsb:02X}" + ) + return med + + except Exception as e: + self.log("error", f"I²C zero measurement exception: {e}") + return None + + # --------------------------- + # Continuous Logger (thread) — UART-style + # --------------------------- + + def start_logger(self, device_address, reg, length, interval_ms: int = 200) -> bool: + """ + Start the background I²C logger thread. + + - If already running: no-op, returns True. + - If not connected (and not in DEBUG): returns False. + + Behavior: + - At each interval, performs a READ of 'length' bytes at 'reg' from 'device_address'. + - During a capture window, emissions are throttled to at most once every _throttle_ns. + """ + if self._logger_running: + self.log("info", "ℹ️ I²C logger already running.") + return True + if not config.DEBUG_MODE and not self.bus: + self.log("error", "⚠️ Cannot start I²C logger: not connected.") + return False + + addr = self._to_int_any(device_address) + r = self._to_int_any(reg) + ln = self._to_int_any(length, default=1) + if addr is None or r is None or ln is None or ln <= 0: + self.log("warning", "⚠️ Invalid logger params (addr/reg/length).") + return False + + self._log_addr = addr + self._log_reg = r + self._log_len = ln + self._log_interval_s = max(0.001, (interval_ms or 200) / 1000.0) + + self._logger_running = True + self._logger_thread = threading.Thread( + target=self._logger_loop, name="I2CLogger", daemon=True + ) + self._logger_thread.start() + self._state = I2CState.LOGGING + self.log( + "success", + f"🟢 I²C logger started | addr 0x{addr:02X} | reg 0x{r:02X} | len {ln} | interval {interval_ms}ms", + ) + return True + + def stop_logger(self) -> bool: + """ + Stop the background I²C logger thread if running. + Joins with a short timeout so shutdown remains responsive. + """ + if not self._logger_running: + return True + self._logger_running = False + t = self._logger_thread + self._logger_thread = None + + # Wait up to ~2s for a clean exit (covers a read that’s mid-call + pacing sleep) + if t and t.is_alive(): + t.join(timeout=2.0) + # If still alive, wait a tiny bit more — but don't hang the UI + if t.is_alive(): + time.sleep(0.05) + + if self.bus: + self._state = I2CState.CONNECTED_IDLE + self.log("info", "🔴 I²C logger stopped.") + return True + + def _logger_loop(self): + """ + Background poller (daemon thread): + - DEBUG_MODE: emits synthetic lines periodically so you can test the UI without hardware. + - Real bus: performs read(s) at ~_log_interval_s cadence. + - During capture: throttle emissions (drop extras) to one per _throttle_ns. + """ + addr = self._log_addr + r = self._log_reg + ln = self._log_len + pace = self._log_interval_s + + last_emit_ns = now_ns() + try: + while self._logger_running: + try: + # Respect capture throttle (like UART) + if self._shutting_down: + break # exit ASAP on shutdown + if i2c_capture_active(): + now = now_ns() + if now - last_emit_ns < self._throttle_ns: + time.sleep(self._logger_sleep_s) + continue + last_emit_ns = now + + # TX line + self.log( + "info", + f"➡️ | addr 0x{addr:02X} | reg 0x{r:02X} | action READ | len {ln}", + ) + + # Perform read under the same bus lock as send_command + with self._bus_lock: + if not self.bus and not config.DEBUG_MODE: + break + if not self.bus and not config.DEBUG_MODE: + break + + if config.DEBUG_MODE: + data = [0xBE] if ln <= 1 else list(range(ln)) + else: + if ln <= 1: + data = [self.bus.read_byte_data(addr, r)] + else: + data = self.bus.read_i2c_block_data(addr, r, ln) + + # RX line + self.log( + "success", + f"⬅️ | addr 0x{addr:02X} | reg 0x{r:02X} | data {self._fmt_bytes(data)}", + ) + + except Exception as e: + self.log( + "error", f"❌ | addr 0x{addr:02X} | reg 0x{r:02X} | error {e}" + ) + + # pacing + time.sleep(pace or self._logger_sleep_s) + finally: + self._logger_running = False + + # --------------------------- + # Commands catalog passthrough + # --------------------------- + + def get_predefined_commands(self): + """ + Load predefined I²C commands for your command table. + The DB returns rows mapped by components.data.db.get_i2c_commands(). + """ + return get_i2c_commands() diff --git a/i2c_old/i2c_polling.py b/i2c_old/i2c_polling.py new file mode 100644 index 0000000..e2cf56f --- /dev/null +++ b/i2c_old/i2c_polling.py @@ -0,0 +1,151 @@ +# File: components/i2c/i2c_polling.py + +from PyQt6.QtWidgets import QWidget, QVBoxLayout, QLabel, QPushButton, QHBoxLayout +from PyQt6.QtCore import QTimer, Qt +from PyQt6.QtGui import QPainter, QColor, QPen +from components.console.console_registry import log_main_console +from smbus2 import SMBus +import config.config as config +import math + +AS5048_ADDR = 0x40 +REG_ANGLE_MSB = 0xFE +REG_ANGLE_LSB = 0xFF + + +class AngleGauge(QWidget): + def __init__(self, parent=None): + super().__init__(parent) + self.setMinimumSize(200, 200) + self.angle = 0 # degrees + + def set_angle(self, angle): + self.angle = max(0.0, min(90.0, angle)) + self.update() + + def paintEvent(self, event): + painter = QPainter(self) + painter.setRenderHint(QPainter.RenderHint.Antialiasing) + + rect = self.rect() + width = rect.width() + height = rect.height() + center_x = width // 2 + bottom_y = height - 10 + length = height - 40 + + # Draw vertical Y axis + pen = QPen(QColor(180, 180, 180), 2) + painter.setPen(pen) + painter.drawLine(center_x, bottom_y, center_x, bottom_y - length) + + # Draw needle + radians = math.radians(self.angle) + end_x = int(center_x + length * math.sin(radians)) + end_y = int(bottom_y - length * math.cos(radians)) + + pen = QPen(QColor(0, 100, 200), 4) + painter.setPen(pen) + painter.drawLine(center_x, bottom_y, end_x, end_y) + + # Draw angle value + painter.setPen(Qt.GlobalColor.black) + painter.drawText( + rect, + int(Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignHCenter), + f"{self.angle:.1f}°", + ) + + +class I2CPollingWidget(QWidget): + def __init__(self, bus_id=1, parent=None): + super().__init__(parent) + + self.label = QLabel("AS5048A Angle Sensor") + self.gauge = AngleGauge() + self.zero_raw = None + + self.start_button = QPushButton("Start") + self.stop_button = QPushButton("Stop") + self.set_zero_button = QPushButton("Set Zero") + + self.start_button.clicked.connect(self.start_polling) + self.stop_button.clicked.connect(self.stop_polling) + self.set_zero_button.clicked.connect(self.set_zero) + + button_layout = QHBoxLayout() + button_layout.addWidget(self.start_button) + button_layout.addWidget(self.stop_button) + button_layout.addWidget(self.set_zero_button) + + layout = QVBoxLayout() + layout.addWidget(self.label) + layout.addWidget(self.gauge) + layout.addLayout(button_layout) + self.setLayout(layout) + + self.bus_id = bus_id + self.bus = None + self.timer = QTimer(self) + self.timer.timeout.connect(self.read_angle) + + if not config.DEBUG_MODE: + try: + self.bus = SMBus(self.bus_id) + log_main_console("success", f"AS5048A ready on bus {self.bus_id}") + except Exception as e: + log_main_console("error", f"Failed to open I2C bus {self.bus_id}: {e}") + else: + log_main_console("info", "DEBUG_MODE: fake polling setup complete") + + def start_polling(self): + self.timer.start(50) + log_main_console("info", "Started I2C angle polling") + + def stop_polling(self): + self.timer.stop() + log_main_console("info", "Stopped I2C angle polling") + + def set_zero(self): + raw = self._read_raw_angle() + if raw is not None: + self.zero_raw = raw + log_main_console("info", f"Zero set to raw value: {self.zero_raw}") + + def _read_raw_angle(self): + if config.DEBUG_MODE: + import random + + return random.randint(0, 16383) + try: + msb = self.bus.read_byte_data(AS5048_ADDR, REG_ANGLE_MSB) + lsb = self.bus.read_byte_data(AS5048_ADDR, REG_ANGLE_LSB) + raw = ((msb << 6) | (lsb & 0x3F)) & 0x3FFF + return raw + except Exception as e: + log_main_console("error", f"I2C read failed: {e}") + return None + + def read_angle(self): + current_raw = self._read_raw_angle() + if current_raw is None or self.zero_raw is None: + return + + # Calculate signed delta + diff = (current_raw - self.zero_raw + 8192) % 16384 - 8192 + angle_deg = -((diff * 360.0) / 16384.0) + + # Clamp to 0–90° range for display + angle_clamped = max(0.0, min(90.0, angle_deg)) + + log_main_console( + "info", + f"Read raw: {current_raw}, Δ: {diff}, Degrees: {angle_deg:.2f}° → Gauge: {angle_clamped:.2f}°", + ) + self.gauge.set_angle(angle_clamped) + + def closeEvent(self, event): + self.timer.stop() + if self.bus: + self.bus.close() + super().closeEvent(event) diff --git a/i2c_old/i2c_ui.py b/i2c_old/i2c_ui.py new file mode 100644 index 0000000..9dd2c9f --- /dev/null +++ b/i2c_old/i2c_ui.py @@ -0,0 +1,342 @@ +from PyQt6.QtWidgets import ( + QWidget, + QVBoxLayout, + QHBoxLayout, + QPushButton, + QComboBox, + QLineEdit, + QSplitter, +) +from PyQt6.QtCore import Qt +from PyQt6.QtCore import QObject, pyqtSignal + +from components.console.console_ui import console_widget +from components.console.console_registry import log_main_console +import components.items.elements as elements +from components.commands.command_table_ui import command_table_widget +import config.config as config + +from components.i2c.i2c_logic import I2CLogic, _coerce_int +from components.i2c.i2c_command_editor import I2CCommandEditorDialog +from components.data import db + + +class _SafeConsoleProxy(QObject): + log_signal = pyqtSignal(str, str) + + def __init__(self, console): + super().__init__() + self.log_signal.connect(console.log) + + def __call__(self, level, msg): + # called from worker thread → emits safely into UI thread + self.log_signal.emit(level, msg) + + +class I2CHandler: + """Adapter used by command_table_widget → forwards to the I2CWidget.""" + + def __init__(self, widget: "I2CWidget"): + self.w = widget + + def get_command_list(self): + return self.w.i2c_logic.get_predefined_commands() + + def send_command(self, command: dict): + self.w.send_command(command) + + def add_command(self): + dialog = I2CCommandEditorDialog() + if dialog.exec(): + command = dialog.get_data() + db.add_i2c_command(command) + + def modify_command(self, command): + dialog = I2CCommandEditorDialog(command=command) + if dialog.exec(): + updated = dialog.get_data() + updated["id"] = command["id"] + db.modify_i2c_command(updated) + + def delete_command(self, command): + db.delete_i2c_command(command) + + +class I2CWidget(QWidget): + """Drop-in single-page I2C widget with table | console splitter.""" + + def __init__(self, parent=None): + super().__init__(parent) + self.i2c_logic = I2CLogic() + self.handler = I2CHandler(self) + self.commands = self.i2c_logic.get_predefined_commands() + self.comboboxes = {} + self.connection_status = False + self.init_ui() + + def init_ui(self): + # Top controls + top_controls = QWidget() + top_controls_layout = QHBoxLayout(top_controls) + top_controls_layout.setContentsMargins(0, 0, 0, 0) + top_controls_layout.setSpacing(12) + top_controls_layout.setAlignment(Qt.AlignmentFlag.AlignTop) + + # Port + self.comboboxes["port"] = QComboBox() + self.comboboxes["port"].addItems(self.i2c_logic.get_channels()) + top_controls_layout.addWidget( + elements.label_and_widget("Port", self.comboboxes["port"]) + ) + + self.button_refresh_channels = elements.create_icon_button( + config.REFRESH_BUTTON_ICON_LINK, icon_size=30, border_size=4 + ) + top_controls_layout.addWidget(self.button_refresh_channels) + + # Address (empty until connected → scan) + self.comboboxes["address"] = QComboBox() + top_controls_layout.addWidget( + elements.label_and_widget("Address", self.comboboxes["address"]) + ) + + self.button_refresh_address = elements.create_icon_button( + config.REFRESH_BUTTON_ICON_LINK, icon_size=30, border_size=4 + ) + top_controls_layout.addWidget(self.button_refresh_address) + + # Connect / Disconnect + self.button_connect = QPushButton("Connect") + top_controls_layout.addWidget( + elements.label_and_widget("", self.button_connect) + ) + + self.button_disconnect = QPushButton("Disconnect") + top_controls_layout.addWidget( + elements.label_and_widget("", self.button_disconnect) + ) + + # Command table + self.command_table = command_table_widget( + commands=self.commands, handler=self.handler + ) + + col1_widget = QWidget() + col1_layout = QVBoxLayout(col1_widget) + col1_layout.setContentsMargins(0, 0, 0, 0) + col1_layout.setSpacing(4) + col1_layout.addWidget(self.command_table) + + # Input + Send + input_line_layout = QHBoxLayout() + input_line_layout.setContentsMargins(0, 0, 0, 0) + input_line_layout.setSpacing(4) + + self.comboboxes["action"] = QComboBox() + self.comboboxes["action"].addItems(["Read", "Write"]) + input_line_layout.addWidget( + elements.label_and_widget("Action", self.comboboxes["action"]) + ) + self.input_register = QLineEdit() + self.input_register.setPlaceholderText("0x04") + input_line_layout.addWidget( + elements.label_and_widget("Register", self.input_register) + ) + + self.input_hex = QLineEdit() + self.input_hex.setPlaceholderText( + "Hex bytes (e.g. 0x01,0x02,0x03) → block write to reg 0x00" + ) + input_line_layout.addWidget(elements.label_and_widget("Data", self.input_hex)) + self.input_length = QLineEdit() + + self.input_length.setPlaceholderText("1") + input_line_layout.addWidget( + elements.label_and_widget("Length", self.input_length) + ) + + self.button_send_raw = QPushButton("Send Raw") + input_line_layout.addWidget(elements.label_and_widget("", self.button_send_raw)) + + # Console + self.console = console_widget() + self.i2c_logic.set_logger(self.console.log) + + console_stack_widget = QWidget() + console_stack_layout = QVBoxLayout(console_stack_widget) + console_stack_layout.setContentsMargins(0, 0, 0, 0) + console_stack_layout.setSpacing(4) + console_stack_layout.addLayout(input_line_layout) + console_stack_layout.addWidget(self.console) + + # Splitter: table | console + splitter = QSplitter(Qt.Orientation.Horizontal) + splitter.addWidget(col1_widget) + splitter.addWidget(console_stack_widget) + splitter.setSizes([740, 1200]) + splitter.setStretchFactor(0, 0) + splitter.setStretchFactor(1, 1) + + # Main layout + main_layout = QVBoxLayout() + main_layout.setContentsMargins(4, 4, 4, 4) + main_layout.setSpacing(6) + main_layout.addWidget(top_controls) + main_layout.addWidget(splitter, stretch=1) + self.setLayout(main_layout) + + # Signals + self.button_refresh_channels.clicked.connect(self.refresh_channels) + self.button_refresh_address.clicked.connect(self.refresh_address) + self.button_connect.clicked.connect(self.connect) + self.button_disconnect.clicked.connect(self.disconnect) + self.button_send_raw.clicked.connect(self.send_command_raw) + + self.disconnected_enable_status() + + # --- UI state toggles --- + def disconnected_enable_status(self): + elements.set_enabled_state(True, self.button_refresh_channels, grayOut=False) + elements.set_enabled_state(True, self.comboboxes["port"], grayOut=False) + elements.set_enabled_state(True, self.command_table, grayOut=False) + elements.set_enabled_state(False, self.input_hex, grayOut=True) + elements.set_enabled_state(False, self.button_send_raw, grayOut=True) + elements.set_enabled_state(False, self.button_disconnect, grayOut=True) + elements.set_enabled_state(True, self.button_connect, grayOut=False) + elements.set_enabled_state(False, self.button_refresh_address, grayOut=True) + elements.set_enabled_state(False, self.comboboxes["address"], grayOut=True) + elements.set_enabled_state(False, self.comboboxes["action"], grayOut=True) + elements.set_enabled_state(False, self.input_register, grayOut=True) + + def connected_enable_status(self): + elements.set_enabled_state(False, self.button_refresh_channels, grayOut=True) + elements.set_enabled_state(False, self.comboboxes["port"], grayOut=True) + elements.set_enabled_state(True, self.command_table, grayOut=False) + elements.set_enabled_state(True, self.input_hex, grayOut=False) + elements.set_enabled_state(True, self.button_send_raw, grayOut=False) + elements.set_enabled_state(True, self.button_disconnect, grayOut=False) + elements.set_enabled_state(False, self.button_connect, grayOut=True) + elements.set_enabled_state(True, self.button_refresh_address, grayOut=False) + elements.set_enabled_state(True, self.comboboxes["address"], grayOut=False) + elements.set_enabled_state(True, self.comboboxes["action"], grayOut=False) + elements.set_enabled_state(True, self.input_register, grayOut=False) + + # --- Buttons --- + def connect(self): + log_main_console("info", "🔗 Connecting...") + success = self.i2c_logic.connect(self.comboboxes["port"].currentText()) + if success: + self.connected_enable_status() + self.command_table.set_connected_state() + self.refresh_address(silent=True) + self.connection_status = True + else: + elements.flash_button( + self.button_connect, flash_style="background-color: red;" + ) + + def disconnect(self): + log_main_console("info", "🔌 Disconnecting...") + if self.i2c_logic.stop_logger(): + self.disconnected_enable_status() + self.command_table.set_disconnected_state() + self.connection_status = False + self.refresh_channels(silent=True) + else: + elements.flash_button( + self.button_disconnect, flash_style="background-color: red;" + ) + + def refresh_channels(self, silent: bool = False): + log_main_console("info", "🔄 Refreshing buses...") + self.comboboxes["port"].clear() + ports = self.i2c_logic.get_channels() + if ports: + self.comboboxes["port"].addItems(ports) + if not silent: + elements.flash_button(self.button_refresh_channels) + log_main_console("success", "🔄 Bus list refreshed") + else: + elements.flash_button( + self.button_refresh_channels, flash_style="background-color: red;" + ) + log_main_console("warn", "No I2C buses found") + + def refresh_address(self, silent: bool = False): + log_main_console("info", "🔄 Scanning bus for devices...") + self.comboboxes["address"].clear() + addresses = self.i2c_logic.scan_bus() + if addresses: + self.comboboxes["address"].addItems(addresses) + if not silent: + elements.flash_button(self.button_refresh_address) + log_main_console("success", "🔄 Bus scan complete") + else: + elements.flash_button( + self.button_refresh_address, flash_style="background-color: red;" + ) + log_main_console("info", "No devices detected") + + def get_current_config(self): + return {key: cb.currentText() for key, cb in self.comboboxes.items()} + + def _current_address_int(self) -> int: + txt = self.comboboxes["address"].currentText().strip() or "0x40" + return _coerce_int(txt) + + def send_command(self, command: dict): + self.i2c_logic.send_command(command, device_address=self._current_address_int()) + + def send_command_raw(self): + action = self.comboboxes["action"].currentText().strip().upper() + reg_txt = (self.input_register.text() or "").strip() + data_txt = (self.input_hex.text() or "").strip() + length_txt = (self.input_length.text() or "").strip() + + # --- helpers --- + def _to_int_any(s, default=None): + try: + return int(str(s), 0) # accepts "0xFE", "254" + except Exception: + return default + + def _split_tokens(s: str): + return [t for t in s.replace(",", " ").split() if t] + + # --- validate register --- + reg = _to_int_any(reg_txt) + if reg is None: + self.console.log("warning", "⚠️ Invalid or missing register (e.g. 0xFE).") + return + + cmd = { + "action": action.lower(), + "reg": reg, + } + + if action == "READ": + ln = _to_int_any(length_txt, default=1) + if ln is None or ln <= 0: + self.console.log("warning", "⚠️ Invalid length (must be >0).") + return + cmd["length"] = ln + + elif action == "WRITE": + toks = _split_tokens(data_txt) + if not toks: + self.console.log( + "warning", "⚠️ WRITE requires data (e.g. 00 or 01,02,03)." + ) + return + if len(toks) == 1: + v = _to_int_any(toks[0]) + if v is None: + self.console.log( + "warning", "⚠️ Invalid byte. Use hex/dec like 0x00 or 0." + ) + return + cmd["value"] = v + else: + cmd["bytes"] = data_txt # leave as string; logic will parse + + self.i2c_logic.send_command(cmd, device_address=self._current_address_int()) diff --git a/run.py b/run.py index ecdc298..b880bf7 100644 --- a/run.py +++ b/run.py @@ -54,16 +54,16 @@ from decoder import decode_uart_packet, decode_i2c_sample class RunExecutor: """ Executes a single RUN. - + A RUN consists of: - - Send UART command + - Send UART command OR execute I2C command - Collect UART packets (with timestamps) - Trigger I2C reads via callback (correlated timestamps) - Wait for stop condition - Decode all data - Save to database """ - + def __init__(self, db_connection: sqlite3.Connection): """ Initialize run executor. @@ -74,6 +74,7 @@ class RunExecutor: self.db_conn = db_connection self.i2c_readings = [] # Storage for I2C readings from callback self.i2c_failures = 0 # Counter for I2C read failures + self.i2c_zero_reference = 0 # Absolute angle used as zero (0 = not zeroed) def execute_run( self, @@ -390,7 +391,146 @@ class RunExecutor: pass return ("error", 0, f"Exception during run: {str(e)}") - + + def execute_i2c_command( + self, + session_id: str, + session_name: str, + run_no: int, + command_id: int, + command_name: str, + operation: str, + device_address: int, + register: int, + hex_string: str, + i2c_port: Optional[I2CHandle], + raw_data_callback = None + ) -> Tuple[str, int, str]: + """ + Execute a single I2C command. + + Args: + session_id: Session ID + session_name: Session name + run_no: Run number + command_id: I2C command ID from database + command_name: Command name (e.g., "zero", "read angle") + operation: Operation type (e.g., "read", "write", "zero") + device_address: I2C device address (e.g., 0x40) + register: Register address (e.g., 0xFE) + hex_string: Number of bytes to read/write + i2c_port: I2C handle + raw_data_callback: Callback for status updates + + Returns: + (status, data_count, error_msg) + """ + try: + if not i2c_port: + return ("error", 0, "I2C port not available") + + run_start_ns = time.time_ns() + + # Parse hex_string as byte count for reads + try: + byte_count = int(hex_string, 16) if isinstance(hex_string, str) else int(hex_string) + except: + byte_count = 2 # Default to 2 bytes + + # Special handling for "zero" operation + if operation.lower() == "zero" or "zero" in command_name.lower(): + return self._execute_i2c_zero( + session_id=session_id, + session_name=session_name, + run_no=run_no, + command_id=command_id, + device_address=device_address, + register=register, + i2c_port=i2c_port, + run_start_ns=run_start_ns, + raw_data_callback=raw_data_callback + ) + + # Regular I2C read + elif operation.lower() == "read": + if raw_data_callback: + raw_data_callback("INFO", f"Read: addr=0x{device_address:02X} reg=0x{register:02X} len={byte_count}") + + status, data = i2c_read_block(i2c_port, device_address, register, byte_count) + + if status == I2CStatus.OK: + hex_data = ' '.join(f'{b:02X}' for b in data) + if raw_data_callback: + raw_data_callback("RX", f"{hex_data}") + return ("success", len(data), "") + else: + return ("error", 0, f"I2C read failed: {status}") + + else: + return ("error", 0, f"Unsupported I2C operation: {operation}") + + except Exception as e: + return ("error", 0, f"Exception during I2C command: {str(e)}") + + def _execute_i2c_zero( + self, + session_id: str, + session_name: str, + run_no: int, + command_id: int, + device_address: int, + register: int, + i2c_port: I2CHandle, + run_start_ns: int, + raw_data_callback = None + ) -> Tuple[str, int, str]: + """ + Execute I2C zeroing: take 50 samples, calculate median as zero reference. + + Returns: + (status, sample_count, error_msg) + """ + import statistics + + if raw_data_callback: + raw_data_callback("INFO", "Zeroing: Collecting 50 samples...") + + samples = [] + duration_ms = 500 # 500ms total + interval_ms = 10 # 10ms between samples (50 samples) + deadline = time.time() + (duration_ms / 1000.0) + + while time.time() < deadline and len(samples) < 50: + status, data = i2c_read_block(i2c_port, device_address, register, 2) + + if status == I2CStatus.OK and len(data) == 2: + # Convert to 14-bit raw value (big-endian) + raw16 = ((data[0] & 0xFF) << 8) | (data[1] & 0xFF) + raw14 = raw16 & 0x3FFF + samples.append(raw14) + + time.sleep(interval_ms / 1000.0) + + if not samples: + if raw_data_callback: + raw_data_callback("ERROR", "I2C Zeroing failed: No valid samples") + return ("error", 0, "No valid samples collected") + + # Calculate median (better than average for outlier rejection) + median_raw14 = int(statistics.median(samples)) + self.i2c_zero_reference = median_raw14 + + # Convert to angle (360° / 16384 counts) + zero_angle_deg = (median_raw14 * 360.0) / 16384.0 + + if raw_data_callback: + raw_data_callback("INFO", f"✓ Zero set: raw14={median_raw14} ({zero_angle_deg:.2f}°) from {len(samples)} samples") + + # Zero reference stored in memory only (self.i2c_zero_reference) + # Will be saved to telemetry tables when UART commands execute + + return ("success", len(samples), "") + def _parse_hex_string(self, hex_str: str) -> Optional[bytes]: """ Parse hex string to bytes. @@ -436,8 +576,8 @@ class RunExecutor: cursor.execute(""" INSERT INTO telemetry_raw ( session_id, session_name, run_no, run_command_id, - t_ns, time_ms, uart_raw_packet, i2c_raw_bytes - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + t_ns, time_ms, uart_raw_packet, i2c_raw_bytes, i2c_zero_ref + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( session_id, session_name, @@ -446,7 +586,8 @@ class RunExecutor: packet_info.start_timestamp, time_ms, packet_info.data, - i2c_bytes # Can be None if no I2C + i2c_bytes, # Can be None if no I2C + self.i2c_zero_reference # Zero reference (0 if not zeroed) )) # Save to telemetry_decoded (main data) @@ -455,15 +596,16 @@ class RunExecutor: cursor.execute(""" INSERT INTO telemetry_decoded ( session_id, session_name, run_no, run_command_id, - t_ns, time_ms - ) VALUES (?, ?, ?, ?, ?, ?) + t_ns, time_ms, i2c_zero_ref + ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( session_id, session_name, run_no, run_command_id, packet_info.start_timestamp, - time_ms + time_ms, + self.i2c_zero_reference # Zero reference (0 if not zeroed) )) # TODO: When decoder is fully implemented, also save: @@ -490,6 +632,7 @@ def execute_run( i2c_register: int = 0xFE, stop_timeout_ms: int = 5000, grace_timeout_ms: int = 1500, + i2c_zero_ref: int = 0, raw_data_callback = None ) -> Tuple[str, int, str]: """ @@ -506,14 +649,18 @@ def execute_run( uart_logger_port: UART logger port (RX for telemetry, optional) i2c_port: I2C port (optional) packet_config: Packet detection configuration + i2c_address: I2C device address + i2c_register: I2C register address stop_timeout_ms: Stop condition timeout grace_timeout_ms: Grace period before first packet + i2c_zero_ref: I2C zero reference (0 = not zeroed) raw_data_callback: Callback for raw data display (direction, hex_string) Returns: (status, packet_count, error_msg) """ executor = RunExecutor(db_connection) + executor.i2c_zero_reference = i2c_zero_ref # Set zero reference from session return executor.execute_run( session_id=session_id, session_name=session_name, @@ -532,6 +679,63 @@ def execute_run( ) +def execute_i2c_command( + db_connection: sqlite3.Connection, + session_id: str, + session_name: str, + run_no: int, + command_id: int, + command_name: str, + operation: str, + device_address: int, + register: int, + hex_string: str, + i2c_port: Optional[I2CHandle], + i2c_zero_ref: int = 0, + raw_data_callback = None +) -> Tuple[str, int, str, int]: + """ + Execute a single I2C command (convenience function). + + Args: + db_connection: Database connection + session_id: Session ID + session_name: Session name + run_no: Run number + command_id: I2C command ID + command_name: Command name + operation: Operation type (read/write/zero) + device_address: I2C device address + register: Register address + hex_string: Byte count or data + i2c_port: I2C handle + i2c_zero_ref: Current I2C zero reference (0 = not zeroed) + raw_data_callback: Callback for status updates + + Returns: + (status, data_count, error_msg, updated_i2c_zero_ref) + """ + executor = RunExecutor(db_connection) + executor.i2c_zero_reference = i2c_zero_ref # Set zero reference from session + + status, data_count, error_msg = executor.execute_i2c_command( + session_id=session_id, + session_name=session_name, + run_no=run_no, + command_id=command_id, + command_name=command_name, + operation=operation, + device_address=device_address, + register=register, + hex_string=hex_string, + i2c_port=i2c_port, + raw_data_callback=raw_data_callback + ) + + # Return updated zero reference (may have changed if zero command) + return (status, data_count, error_msg, executor.i2c_zero_reference) + + if __name__ == "__main__": print("Run Module") print("=" * 60) diff --git a/session.py b/session.py index 31a2de4..a71baaf 100644 --- a/session.py +++ b/session.py @@ -60,7 +60,7 @@ from i2c.i2c_kit.i2c_core import ( ) # Run executor -from run import execute_run +from run import execute_run, execute_i2c_command # Database manager from database.init_database import DatabaseManager @@ -139,8 +139,14 @@ class Session(QObject): # Phase execution (multi-phase support: Init, Execute, De-init) self.phases: List[Dict[str, Any]] = [] # List of phase configs self.total_commands: int = 0 # Total commands across all phases + self.total_uart_runs: int = 0 # Total UART commands (actual runs) + self.total_i2c_commands: int = 0 # Total I2C commands (not runs) self.current_command_index: int = 0 # Global command counter - + self.current_phase_name: str = "" # Current executing phase name + + # I2C zero reference (persists across all runs in session) + self.i2c_zero_reference: int = 0 # Absolute angle used as zero (0 = not zeroed) + # Execution control flags self.is_running: bool = False self.is_paused: bool = False @@ -243,6 +249,8 @@ class Session(QObject): # Storage for phase configurations self.phases = [] # List of dicts: {'name': str, 'commands': list, 'profile_id': int} self.total_commands = 0 + self.total_uart_runs = 0 + self.total_i2c_commands = 0 # Load Init phase if init_session_id is not None: @@ -252,6 +260,12 @@ class Session(QObject): if phase_config: self.phases.append(phase_config) self.total_commands += len(phase_config['commands']) + # Count by type + for cmd in phase_config['commands']: + if cmd.get('command_type', 'uart') == 'i2c': + self.total_i2c_commands += 1 + else: + self.total_uart_runs += 1 # Load Execute phase if execute_session_id is not None: @@ -261,6 +275,12 @@ class Session(QObject): if phase_config: self.phases.append(phase_config) self.total_commands += len(phase_config['commands']) + # Count by type + for cmd in phase_config['commands']: + if cmd.get('command_type', 'uart') == 'i2c': + self.total_i2c_commands += 1 + else: + self.total_uart_runs += 1 # Load De-init phase if deinit_session_id is not None: @@ -270,6 +290,12 @@ class Session(QObject): if phase_config: self.phases.append(phase_config) self.total_commands += len(phase_config['commands']) + # Count by type + for cmd in phase_config['commands']: + if cmd.get('command_type', 'uart') == 'i2c': + self.total_i2c_commands += 1 + else: + self.total_uart_runs += 1 # Check at least one phase loaded if len(self.phases) == 0: @@ -295,7 +321,7 @@ class Session(QObject): self.status_changed.emit(f"Multi-phase session loaded: {self.session_name}") self.status_changed.emit(f"Interface: {self.interface_config['profile_name']}") self.status_changed.emit(f"Phases: {phase_summary}") - self.status_changed.emit(f"Total commands: {self.total_commands}") + self.status_changed.emit(f"Total: {self.total_uart_runs} UART runs, {self.total_i2c_commands} I2C commands") return True @@ -352,21 +378,44 @@ class Session(QObject): self.error_occurred.emit(f"{phase_name}: Command missing command_id") return (False, None) - # Check if command exists and load details - cursor = self.db_conn.execute(""" - SELECT command_name, hex_string - FROM uart_commands - WHERE command_id = ? - """, (cmd_id,)) - - row = cursor.fetchone() - if not row: - self.error_occurred.emit(f"{phase_name}: UART command {cmd_id} not found") - return (False, None) - - # Store command details - cmd['command_name'] = row[0] - cmd['hex_string'] = row[1] + # Check command type (default to 'uart' for backwards compatibility) + command_type = cmd.get('command_type', 'uart') + + if command_type == 'i2c': + # Load I2C command from database + cursor = self.db_conn.execute(""" + SELECT command_name, operation, register, hex_string, device_address + FROM i2c_commands + WHERE command_id = ? + """, (cmd_id,)) + + row = cursor.fetchone() + if not row: + self.error_occurred.emit(f"{phase_name}: I2C command {cmd_id} not found") + return (False, None) + + # Store I2C command details + cmd['command_name'] = row[0] + cmd['operation'] = row[1] + cmd['register'] = row[2] + cmd['hex_string'] = row[3] + cmd['device_address'] = row[4] + else: + # Load UART command from database (default) + cursor = self.db_conn.execute(""" + SELECT command_name, hex_string + FROM uart_commands + WHERE command_id = ? + """, (cmd_id,)) + + row = cursor.fetchone() + if not row: + self.error_occurred.emit(f"{phase_name}: UART command {cmd_id} not found") + return (False, None) + + # Store UART command details + cmd['command_name'] = row[0] + cmd['hex_string'] = row[1] # Create phase config phase_config = { @@ -669,13 +718,14 @@ class Session(QObject): # =================================================================== # 3. Set execution state # =================================================================== - + self.is_running = True self.is_paused = False self.pause_queued = False self.stop_queued = False self.current_command_index = 0 - + self.i2c_zero_reference = 0 # Reset zero reference for new session + # Emit session started self.session_started.emit(self.current_session_id) self.status_changed.emit(f"Session started: {self.session_name}") @@ -710,11 +760,13 @@ class Session(QObject): ALL remaining phases. """ try: - global_cmd_index = 0 # Track overall command number + global_cmd_index = 0 # Track UART run number + total_commands_executed = 0 # Track all commands (UART + I2C) # Loop through phases for phase_index, phase in enumerate(self.phases, 1): phase_name = phase['name'] + self.current_phase_name = phase_name # Track current phase phase_commands = phase['commands'] total_phases = len(self.phases) @@ -723,11 +775,9 @@ class Session(QObject): # Loop through commands in this phase for cmd_index_in_phase, cmd in enumerate(phase_commands, 1): - global_cmd_index += 1 - self.current_command_index = global_cmd_index - + total_commands_executed += 1 # Count all commands # =============================================================== - # 1. Check if stop was queued (before starting new run) + # 1. Check if stop was queued (before starting new command) # =============================================================== if self.stop_queued: @@ -736,57 +786,118 @@ class Session(QObject): return # =============================================================== - # 2. Emit command started + # 2. Check command type - I2C commands don't count as runs # =============================================================== + command_type = cmd.get('command_type', 'uart') command_name = cmd['command_name'] - self.command_started.emit(global_cmd_index, command_name) - self.status_changed.emit( - f"[{phase_name}] Command {cmd_index_in_phase}/{len(phase_commands)} " - f"(Total: {global_cmd_index}/{self.total_commands}): {command_name}" - ) - + + # Only increment run number for UART commands + if command_type == 'uart': + global_cmd_index += 1 + self.current_command_index = global_cmd_index + # =============================================================== - # 3. Execute run via run.py + # 3. Emit command started # =============================================================== - status, packet_count, error_msg = execute_run( - db_connection=self.db_conn, - session_id=self.current_session_id, - session_name=self.session_name, - run_no=global_cmd_index, - command_id=cmd['command_id'], - command_hex=cmd['hex_string'], - uart_command_port=self.uart_command_port, - uart_logger_port=self.uart_logger_port, - i2c_port=self.i2c_handle, - packet_config=self.packet_config, - i2c_address=int(self.interface_config['i2c_slave_address'], 16) if self.interface_config.get('i2c_slave_address') else 0x40, - i2c_register=int(self.interface_config['i2c_slave_read_register'], 16) if self.interface_config.get('i2c_slave_read_register') else 0xFE, - stop_timeout_ms=self.interface_config['uart_logger_timeout_ms'], - grace_timeout_ms=self.interface_config['uart_logger_grace_ms'], - raw_data_callback=lambda direction, hex_str: self.raw_data_received.emit(direction, hex_str) - ) + if command_type == 'i2c': + # I2C command - formatted output: [Phase] [I2C] [INFO] message + self.raw_data_received.emit( + f"[{phase_name}] [I2C] [INFO]", + f"Command: {command_name}" + ) + else: + # UART command - formatted output: [Phase] [UART] [INFO] message + self.command_started.emit(global_cmd_index, command_name) + self.raw_data_received.emit( + f"[{phase_name}] [UART] [INFO]", + f"Run {global_cmd_index}: {command_name}" + ) # =============================================================== - # 4. Handle run result + # 4. Execute command (UART or I2C) via run.py + # =============================================================== + + if command_type == 'i2c': + # Execute I2C command (doesn't count as run, no telemetry saved) + # Callback formats as: [Phase] [I2C] [Action] message + status, packet_count, error_msg, updated_zero_ref = execute_i2c_command( + db_connection=self.db_conn, + session_id=self.current_session_id, + session_name=self.session_name, + run_no=0, # Not a run + command_id=cmd['command_id'], + command_name=cmd['command_name'], + operation=cmd.get('operation', 'read'), + device_address=int(cmd.get('device_address', '0x40'), 16), + register=int(cmd.get('register', '0xFE'), 16), + hex_string=cmd.get('hex_string', '02'), + i2c_port=self.i2c_handle, + i2c_zero_ref=self.i2c_zero_reference, + raw_data_callback=lambda action, msg: self.raw_data_received.emit( + f"[{phase_name}] [I2C] [{action}]", msg + ) + ) + # Update session's zero reference (may have changed if zero command) + self.i2c_zero_reference = updated_zero_ref + else: + # Execute UART command (counts as run, saves telemetry) + # Callback formats as: [Phase] [UART] [Action] message + status, packet_count, error_msg = execute_run( + db_connection=self.db_conn, + session_id=self.current_session_id, + session_name=self.session_name, + run_no=global_cmd_index, + command_id=cmd['command_id'], + command_hex=cmd['hex_string'], + uart_command_port=self.uart_command_port, + uart_logger_port=self.uart_logger_port, + i2c_port=self.i2c_handle, + packet_config=self.packet_config, + i2c_address=int(self.interface_config['i2c_slave_address'], 16) if self.interface_config.get('i2c_slave_address') else 0x40, + i2c_register=int(self.interface_config['i2c_slave_read_register'], 16) if self.interface_config.get('i2c_slave_read_register') else 0xFE, + stop_timeout_ms=self.interface_config['uart_logger_timeout_ms'], + grace_timeout_ms=self.interface_config['uart_logger_grace_ms'], + i2c_zero_ref=self.i2c_zero_reference, + raw_data_callback=lambda action, msg: self.raw_data_received.emit( + f"[{phase_name}] [UART] [{action}]", msg + ) + ) + + # =============================================================== + # 5. Handle command result # =============================================================== if status == "error": - # Run failed - abort session (all phases) - self.error_occurred.emit(f"[{phase_name}] Run {global_cmd_index} failed: {error_msg}") + # Command failed - abort session (all phases) + if command_type == 'i2c': + self.error_occurred.emit(f"[{phase_name}] I2C command '{command_name}' failed: {error_msg}") + else: + self.error_occurred.emit(f"[{phase_name}] Run {global_cmd_index} failed: {error_msg}") self._finalize_session('error') return - # Run succeeded - emit completion - self.run_completed.emit(global_cmd_index, packet_count) - self.status_changed.emit(f"[{phase_name}] Run {global_cmd_index} complete: {packet_count} packets") - - # Update total runs in database - self.db_conn.execute(""" - UPDATE sessions SET total_runs = ? WHERE session_id = ? - """, (global_cmd_index, self.current_session_id)) - self.db_conn.commit() + # Command succeeded + if command_type == 'uart': + # UART run - emit completion and update database + self.run_completed.emit(global_cmd_index, packet_count) + self.raw_data_received.emit( + f"[{phase_name}] [UART] [INFO]", + f"Complete: {packet_count} packets" + ) + + # Update total runs in database + self.db_conn.execute(""" + UPDATE sessions SET total_runs = ? WHERE session_id = ? + """, (global_cmd_index, self.current_session_id)) + self.db_conn.commit() + else: + # I2C command - just log completion (no run counting) + self.raw_data_received.emit( + f"[{phase_name}] [I2C] [INFO]", + f"Complete" + ) # =============================================================== # 5. Delay between commands (with countdown and queue check) @@ -796,7 +907,7 @@ class Session(QObject): delay_ms = cmd.get('delay_ms', 3000) # Only delay if not last command overall - if global_cmd_index < self.total_commands: + if total_commands_executed < self.total_commands: self._execute_delay(delay_ms) # Check if pause/stop was queued during delay diff --git a/session_widget.py b/session_widget.py index b272433..ebf0094 100644 --- a/session_widget.py +++ b/session_widget.py @@ -468,14 +468,15 @@ class SessionWidget(QWidget): def _on_command_started(self, command_no: int, command_name: str): """Handle command started signal.""" self.executing_label.setText(f"Executing: {command_name}") - total = self.session.total_commands - self.command_label.setText(f"Command: {command_no} / {total}") - self._log_info(f"[{command_no}/{total}] {command_name}") + total = self.session.total_uart_runs + self.command_label.setText(f"Run: {command_no} / {total}") + # Don't log here - already logged via raw_data_received with formatted tags @pyqtSlot(int, int) def _on_run_completed(self, run_no: int, packet_count: int): """Handle run completed signal.""" - self._log_info(f"Run {run_no} complete: {packet_count} packets detected") + # Don't log here - already logged via raw_data_received with formatted tags + pass @pyqtSlot(int) def _on_delay_countdown(self, seconds_remaining: int): @@ -531,25 +532,45 @@ class SessionWidget(QWidget): Handle raw UART data display. Args: - direction: "TX", "RX", or "ERROR" + direction: "TX", "RX", "ERROR", "INFO" or formatted "[Phase] [Interface] [Action]" hex_string: Hex bytes (e.g., "EF FE 01 02 03") or error message """ - if direction == "TX": - color = "green" - prefix = "→ TX" - elif direction == "ERROR": - color = "red" - prefix = "✗ ERROR" - elif direction == "INFO": - color = "gray" - prefix = "ℹ INFO" + # Check if direction is already formatted (starts with '[') + if direction.startswith('['): + # New format: [Phase] [Interface] [Action] message + # Extract action for color coding + if '[TX]' in direction: + color = "green" + elif '[ERROR]' in direction: + color = "red" + elif '[INFO]' in direction: + color = "gray" + else: # RX or other + color = "blue" + + # Display full formatted string + self.log_display.append( + f"{direction} {hex_string}" + ) else: - color = "blue" - prefix = "← RX" + # Old format: Simple direction + message + if direction == "TX": + color = "green" + prefix = "→ TX" + elif direction == "ERROR": + color = "red" + prefix = "✗ ERROR" + elif direction == "INFO": + color = "gray" + prefix = "ℹ INFO" + else: + color = "blue" + prefix = "← RX" + + self.log_display.append( + f"[{prefix}] {hex_string}" + ) - self.log_display.append( - f"[{prefix}] {hex_string}" - ) # Auto-scroll to bottom self.log_display.verticalScrollBar().setValue( self.log_display.verticalScrollBar().maximum()