i2c command added data logger ui is much more condncsed, redy for decoding

main
Kynsight 3 weeks ago
parent 70eb58531f
commit de53882ead

@ -100,8 +100,17 @@ class ConfigureSessionWidget(QWidget):
add_layout = QVBoxLayout()
add_form = QFormLayout()
# Command type selector
self.cmd_type_combo = QComboBox()
self.cmd_type_combo.addItem("UART", "uart")
self.cmd_type_combo.addItem("I2C", "i2c")
self.cmd_type_combo.currentIndexChanged.connect(self._on_command_type_changed)
add_form.addRow("Command Type:", self.cmd_type_combo)
# Command selector (will be populated based on type)
self.cmd_combo = QComboBox()
add_form.addRow("UART Command:", self.cmd_combo)
add_form.addRow("Command:", self.cmd_combo)
self.delay_spin = QSpinBox()
self.delay_spin.setRange(0, 60000)
@ -156,6 +165,14 @@ class ConfigureSessionWidget(QWidget):
widget.setLayout(layout)
return widget
def _on_command_type_changed(self):
"""Load commands when type changes."""
cmd_type = self.cmd_type_combo.currentData()
if cmd_type == "uart":
self._load_uart_commands()
else:
self._load_i2c_commands()
def _load_uart_commands(self):
"""Load UART commands into dropdown."""
self.cmd_combo.clear()
@ -165,6 +182,15 @@ class ConfigureSessionWidget(QWidget):
for row in cursor.fetchall():
self.cmd_combo.addItem(row[1], row[0])
def _load_i2c_commands(self):
"""Load I2C commands into dropdown."""
self.cmd_combo.clear()
cursor = self.db_conn.execute(
"SELECT command_id, command_name FROM i2c_commands WHERE is_active=1 ORDER BY command_name"
)
for row in cursor.fetchall():
self.cmd_combo.addItem(row[1], row[0])
def _load_profiles(self):
"""Load profiles from database."""
self.profile_list.clear()
@ -197,13 +223,22 @@ class ConfigureSessionWidget(QWidget):
self.command_sequence = []
for cmd in json_obj.get("commands", []):
cmd_id = cmd["command_id"]
# Get command name from database
cmd_type = cmd.get("command_type", "uart") # Default to uart for backwards compatibility
# Get command name from correct table based on type
if cmd_type == "i2c":
c = self.db_conn.execute(
"SELECT command_name FROM i2c_commands WHERE command_id = ?", (cmd_id,)
)
else:
c = self.db_conn.execute(
"SELECT command_name FROM uart_commands WHERE command_id = ?", (cmd_id,)
)
cmd_row = c.fetchone()
if cmd_row:
self.command_sequence.append({
"command_type": cmd_type,
"command_id": cmd_id,
"command_name": cmd_row[0],
"delay_ms": cmd["delay_ms"]
@ -220,20 +255,24 @@ class ConfigureSessionWidget(QWidget):
"""Update sequence list display."""
self.sequence_list.clear()
for i, cmd in enumerate(self.command_sequence, 1):
text = f"{i}. {cmd['command_name']} ({cmd['delay_ms']}ms)"
cmd_type = cmd.get('command_type', 'uart')
type_label = "UART" if cmd_type == "uart" else "I2C"
text = f"{i}. [{type_label}] {cmd['command_name']} ({cmd['delay_ms']}ms)"
self.sequence_list.addItem(text)
def _on_add_command(self):
"""Add command to sequence."""
if self.cmd_combo.count() == 0:
QMessageBox.warning(self, "Error", "No UART commands available")
QMessageBox.warning(self, "Error", "No commands available")
return
cmd_type = self.cmd_type_combo.currentData()
cmd_id = self.cmd_combo.currentData()
cmd_name = self.cmd_combo.currentText()
delay = self.delay_spin.value()
self.command_sequence.append({
"command_type": cmd_type,
"command_id": cmd_id,
"command_name": cmd_name,
"delay_ms": delay
@ -339,7 +378,11 @@ class ConfigureSessionWidget(QWidget):
# Build JSON
json_obj = {
"commands": [
{"command_id": cmd["command_id"], "delay_ms": cmd["delay_ms"]}
{
"command_id": cmd["command_id"],
"command_type": cmd.get("command_type", "uart"),
"delay_ms": cmd["delay_ms"]
}
for cmd in self.command_sequence
]
}

Binary file not shown.

Binary file not shown.

@ -163,6 +163,7 @@ CREATE TABLE IF NOT EXISTS "telemetry_raw" (
"uart_raw_packet" BLOB,
"i2c_raw_bytes" BLOB,
"i2c_zero_ref" INTEGER DEFAULT 0,
FOREIGN KEY ("session_id") REFERENCES "sessions"("session_id"),
FOREIGN KEY ("run_command_id") REFERENCES "uart_commands"("command_id")
@ -196,6 +197,7 @@ CREATE TABLE IF NOT EXISTS "telemetry_decoded" (
"i2c_delta_raw14" INTEGER,
"i2c_angle_deg" REAL,
"i2c_zero_angle_deg" REAL,
"i2c_zero_ref" INTEGER DEFAULT 0,
-- Derived
"angular_velocity" REAL,

@ -0,0 +1,76 @@
# File: components/i2c/i2c_command_editor.py
from PyQt6.QtWidgets import (
QDialog,
QVBoxLayout,
QHBoxLayout,
QLineEdit,
QPushButton,
QFormLayout,
QComboBox,
QSpinBox,
)
from PyQt6.QtCore import Qt
class I2CCommandEditorDialog(QDialog):
def __init__(self, command=None):
super().__init__()
self.setWindowTitle("Modify I2C Command" if command else "Add I2C Command")
form_layout = QFormLayout()
form_layout.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
form_layout.setFormAlignment(Qt.AlignmentFlag.AlignTop)
form_layout.setSpacing(10)
# === Input Fields ===
self.name_input = QLineEdit()
self.description_input = QLineEdit()
self.category_input = QLineEdit()
self.operation_input = QComboBox()
self.operation_input.addItems(["read", "write"])
self.register_input = QSpinBox()
self.register_input.setRange(0, 255)
self.hex_input = QLineEdit()
self.hex_input.setPlaceholderText("0x01,0x02")
if command:
self.name_input.setText(command.get("name", ""))
self.description_input.setText(command.get("description", ""))
self.category_input.setText(command.get("category", ""))
self.operation_input.setCurrentText(command.get("operation", "read"))
self.register_input.setValue(command.get("register", 0))
self.hex_input.setText(command.get("hex_string", ""))
form_layout.addRow("Name:", self.name_input)
form_layout.addRow("Description:", self.description_input)
form_layout.addRow("Category:", self.category_input)
form_layout.addRow("Operation:", self.operation_input)
form_layout.addRow("Register:", self.register_input)
form_layout.addRow("Hex:", self.hex_input)
button_layout = QHBoxLayout()
self.ok_button = QPushButton("OK")
self.cancel_button = QPushButton("Cancel")
self.ok_button.clicked.connect(self.accept)
self.cancel_button.clicked.connect(self.reject)
button_layout.addStretch()
button_layout.addWidget(self.ok_button)
button_layout.addWidget(self.cancel_button)
layout = QVBoxLayout(self)
layout.addLayout(form_layout)
layout.addLayout(button_layout)
self.setMinimumWidth(400)
def get_data(self):
return {
"name": self.name_input.text().strip(),
"description": self.description_input.text().strip(),
"category": self.category_input.text().strip(),
"operation": self.operation_input.currentText(),
"register": self.register_input.value(),
"hex_string": self.hex_input.text().strip(),
}

@ -0,0 +1,331 @@
# components/i2c/i2c_logger_ui.py
#
from PyQt6.QtWidgets import (
QWidget,
QVBoxLayout,
QHBoxLayout,
QPushButton,
QComboBox,
QLineEdit,
QSplitter,
)
from PyQt6.QtCore import QObject, pyqtSignal
from PyQt6.QtCore import Qt
from components.console.console_ui import console_widget
from components.console.console_registry import log_main_console
import components.items.elements as elements
import config.config as config
from components.i2c.i2c_logic import I2CLogic, _coerce_int
class _SafeConsoleProxy(QObject):
log_signal = pyqtSignal(str, str) # level, message
def __init__(self, console):
super().__init__()
self.log_signal.connect(console.log)
def __call__(self, level, msg):
self.log_signal.emit(level, msg)
class I2CLoggerWidget(QWidget):
"""Drop-in single-page I2C widget with table | console splitter."""
def __init__(self, parent=None):
super().__init__(parent)
self.i2c_logic = I2CLogic()
self.commands = self.i2c_logic.get_predefined_commands()
self.comboboxes = {}
self.connection_status = False
self.init_ui()
def init_ui(self):
# Top controls
top_controls = QWidget()
top_controls_layout = QHBoxLayout(top_controls)
top_controls_layout.setContentsMargins(0, 0, 0, 0)
top_controls_layout.setSpacing(12)
top_controls_layout.setAlignment(Qt.AlignmentFlag.AlignTop)
# Port
self.comboboxes["port"] = QComboBox()
self.comboboxes["port"].addItems(self.i2c_logic.get_channels())
top_controls_layout.addWidget(
elements.label_and_widget("Port", self.comboboxes["port"])
)
self.button_refresh_channels = elements.create_icon_button(
config.REFRESH_BUTTON_ICON_LINK, icon_size=30, border_size=4
)
top_controls_layout.addWidget(self.button_refresh_channels)
# Address (empty until connected → scan)
self.comboboxes["address"] = QComboBox()
top_controls_layout.addWidget(
elements.label_and_widget("Address", self.comboboxes["address"])
)
self.button_refresh_address = elements.create_icon_button(
config.REFRESH_BUTTON_ICON_LINK, icon_size=30, border_size=4
)
top_controls_layout.addWidget(self.button_refresh_address)
self.input_register = QLineEdit()
self.input_register.setPlaceholderText("0x04")
top_controls_layout.addWidget(
elements.label_and_widget("Register", self.input_register)
)
self.input_length = QLineEdit()
self.input_length.setPlaceholderText("1")
top_controls_layout.addWidget(
elements.label_and_widget("Length", self.input_length)
)
# Set Zero / Connect / start / Disconnect
self.button_set_zero = QPushButton("Set Zero")
top_controls_layout.addWidget(
elements.label_and_widget("", self.button_set_zero)
)
self.button_connect = QPushButton("Connect")
top_controls_layout.addWidget(
elements.label_and_widget("", self.button_connect)
)
self.button_start = QPushButton("Start")
top_controls_layout.addWidget(elements.label_and_widget("", self.button_start))
self.button_disconnect = QPushButton("Disconnect")
top_controls_layout.addWidget(
elements.label_and_widget("", self.button_disconnect)
)
col1_widget = QWidget()
col1_layout = QVBoxLayout(col1_widget)
# col1_layout.setContentsMargins(0, 0, 0, 0)
col1_layout.setSpacing(4)
# Console
self.console = console_widget()
# IMPORTANT: keep a strong reference so it isnt GCd
self._i2c_console_proxy = _SafeConsoleProxy(self.console)
self.i2c_logic.set_logger(self._i2c_console_proxy)
# === Console only ===
console_stack_widget = QWidget()
console_stack_layout = QVBoxLayout(console_stack_widget)
console_stack_layout.setContentsMargins(0, 0, 0, 0)
console_stack_layout.setSpacing(4)
console_stack_layout.addWidget(self.console)
# === Splitter (kept for consistency, only console pane) ===
splitter = QSplitter(Qt.Orientation.Horizontal)
splitter.addWidget(console_stack_widget)
splitter.setStretchFactor(0, 1)
# === Main Layout ===
main_layout = QVBoxLayout()
main_layout.setContentsMargins(4, 4, 4, 4)
main_layout.setSpacing(6)
main_layout.addWidget(top_controls)
main_layout.addWidget(splitter, stretch=1)
self.setLayout(main_layout)
# Signals
self.button_refresh_channels.clicked.connect(self.refresh_channels)
self.button_refresh_address.clicked.connect(self.refresh_address)
self.button_set_zero.clicked.connect(self.set_zero)
self.button_connect.clicked.connect(self.connect)
self.button_start.clicked.connect(self.start_logging)
self.button_disconnect.clicked.connect(self.disconnect)
self.disconnected_enable_status()
# --- UI state toggles ---
def disconnected_enable_status(self):
elements.set_enabled_state(True, self.button_refresh_channels, grayOut=False)
elements.set_enabled_state(True, self.comboboxes["port"], grayOut=False)
elements.set_enabled_state(False, self.button_disconnect, grayOut=True)
elements.set_enabled_state(True, self.button_connect, grayOut=False)
elements.set_enabled_state(False, self.button_refresh_address, grayOut=True)
elements.set_enabled_state(False, self.comboboxes["address"], grayOut=True)
elements.set_enabled_state(False, self.input_register, grayOut=True)
elements.set_enabled_state(False, self.button_start, grayOut=True)
elements.set_enabled_state(False, self.input_length, grayOut=True)
def connected_enable_status(self):
elements.set_enabled_state(False, self.button_refresh_channels, grayOut=True)
elements.set_enabled_state(False, self.comboboxes["port"], grayOut=True)
elements.set_enabled_state(True, self.button_disconnect, grayOut=False)
elements.set_enabled_state(False, self.button_connect, grayOut=True)
elements.set_enabled_state(True, self.button_refresh_address, grayOut=False)
elements.set_enabled_state(True, self.comboboxes["address"], grayOut=False)
elements.set_enabled_state(True, self.input_register, grayOut=False)
elements.set_enabled_state(True, self.button_start, grayOut=False)
elements.set_enabled_state(True, self.input_length, grayOut=False)
def started_enable_status(self):
elements.set_enabled_state(False, self.button_refresh_channels, grayOut=True)
elements.set_enabled_state(False, self.comboboxes["port"], grayOut=True)
elements.set_enabled_state(True, self.button_disconnect, grayOut=False)
elements.set_enabled_state(False, self.button_connect, grayOut=True)
elements.set_enabled_state(False, self.button_refresh_address, grayOut=True)
elements.set_enabled_state(False, self.comboboxes["address"], grayOut=True)
elements.set_enabled_state(False, self.input_register, grayOut=True)
elements.set_enabled_state(False, self.button_start, grayOut=True)
elements.set_enabled_state(False, self.input_length, grayOut=True)
# --- Buttons ---
def set_zero(self):
log_main_console("info", "Stting Zero")
self.i2c_logic.measure_zero_raw14()
def connect(self):
log_main_console("info", "🔗 Connecting...")
success = self.i2c_logic.connect(self.comboboxes["port"].currentText())
if success:
self.connected_enable_status()
self.refresh_address(silent=True)
self.connection_status = True
else:
elements.flash_button(
self.button_connect, flash_style="background-color: red;"
)
def start_logging(self):
# use the combobox address and user fields (reg, length)
addr = self._current_address_int()
reg_txt = (self.input_register.text() or "").strip()
len_txt = (self.input_length.text() or "").strip() or "1"
def _to_int_any(s, default=None):
try:
return int(str(s), 0)
except Exception:
return default
reg = _to_int_any(reg_txt)
length = _to_int_any(len_txt, default=1)
if reg is None or length is None or length <= 0:
self.console.log("warning", "⚠️ Invalid register or length")
return
# start the logic-side logger (read-only continuous)
if self.i2c_logic.start_logger(
device_address=addr, reg=reg, length=length, interval_ms=100
):
self.started_enable_status()
else:
elements.flash_button(
self.button_start, flash_style="background-color: red;"
)
def disconnect(self):
log_main_console("info", "🔌 Disconnecting...")
if self.i2c_logic.disconnect():
self.disconnected_enable_status()
self.connection_status = False
self.refresh_channels(silent=True)
else:
elements.flash_button(
self.button_disconnect, flash_style="background-color: red;"
)
def refresh_channels(self, silent: bool = False):
log_main_console("info", "🔄 Refreshing buses...")
self.comboboxes["port"].clear()
ports = self.i2c_logic.get_channels()
if ports:
self.comboboxes["port"].addItems(ports)
if not silent:
elements.flash_button(self.button_refresh_channels)
log_main_console("success", "🔄 Bus list refreshed")
else:
elements.flash_button(
self.button_refresh_channels, flash_style="background-color: red;"
)
log_main_console("warn", "No I2C buses found")
def refresh_address(self, silent: bool = False):
log_main_console("info", "🔄 Scanning bus for devices...")
self.comboboxes["address"].clear()
addresses = self.i2c_logic.scan_bus()
if addresses:
self.comboboxes["address"].addItems(addresses)
if not silent:
elements.flash_button(self.button_refresh_address)
log_main_console("success", "🔄 Bus scan complete")
else:
elements.flash_button(
self.button_refresh_address, flash_style="background-color: red;"
)
log_main_console("info", "No devices detected")
def get_current_config(self):
return {key: cb.currentText() for key, cb in self.comboboxes.items()}
def _current_address_int(self) -> int:
txt = self.comboboxes["address"].currentText().strip() or "0x40"
return _coerce_int(txt)
def send_command(self, command: dict):
self.i2c_logic.send_command(command, device_address=self._current_address_int())
def send_command_raw(self):
action = self.comboboxes["action"].currentText().strip().upper()
reg_txt = (self.input_register.text() or "").strip()
data_txt = (self.input_hex.text() or "").strip()
length_txt = (self.input_length.text() or "").strip()
# --- helpers ---
def _to_int_any(s, default=None):
try:
return int(str(s), 0) # accepts "0xFE", "254"
except Exception:
return default
def _split_tokens(s: str):
return [t for t in s.replace(",", " ").split() if t]
# --- validate register ---
reg = _to_int_any(reg_txt)
if reg is None:
self.console.log("warning", "⚠️ Invalid or missing register (e.g. 0xFE).")
return
cmd = {
"action": action.lower(),
"reg": reg,
}
if action == "READ":
ln = _to_int_any(length_txt, default=1)
if ln is None or ln <= 0:
self.console.log("warning", "⚠️ Invalid length (must be >0).")
return
cmd["length"] = ln
elif action == "WRITE":
toks = _split_tokens(data_txt)
if not toks:
self.console.log(
"warning", "⚠️ WRITE requires data (e.g. 00 or 01,02,03)."
)
return
if len(toks) == 1:
v = _to_int_any(toks[0])
if v is None:
self.console.log(
"warning", "⚠️ Invalid byte. Use hex/dec like 0x00 or 0."
)
return
cmd["value"] = v
else:
cmd["bytes"] = data_txt # leave as string; logic will parse
self.i2c_logic.send_command(cmd, device_address=self._current_address_int())

@ -0,0 +1,639 @@
# -----------------------
# File: components/i2c/i2c_logic.py
#
# Purpose
# -------
# Unified I²C logic that supports:
# 1) One-shot command send under an exclusive "capture window"
# 2) A continuous background logger (thread) that politely backs off during capture
#
# Key Ideas
# ---------
# - We reuse the global coordinator (components.scheduler.coordinator) to mark an
# exclusive "capture window" while a one-shot I²C command is in flight.
# During this window, other sends are rejected, and the logger throttles
# output to keep the console readable and the UI snappy — same as UART.
# - The logger runs in its own daemon thread and performs periodic READs from a
# chosen register/length at a configurable interval.
# - All SMBus interactions are serialized by a single RLock so disconnect/close
# cannot race against in-flight reads/writes (prevents SIGSEGV).
#
# Tuning / Knobs
# --------------
# - self._logger_sleep_s: how often the logger loop yields when idle/throttled
# (default ~50 Hz). Increase for less CPU, decrease for more responsiveness.
# - _throttle_ns (inside _logger_loop): during capture, the logger emits at most
# once every 50 ms. Increase to further reduce noise.
# - The default capture window in send_command() is ~300 ms; adjust if needed.
#
# Logging Format
# --------------
# - TX (read): "➡️ | addr 0xAA | reg 0xRR | action READ | len N"
# - RX (n bytes): "⬅️ | addr 0xAA | reg 0xRR | data [0xV0, 0xV1, ...]"
# - TX (write1): "➡️ | addr 0xAA | reg 0xRR | action WRITE | value 0xVV"
# - TX (writeN): "➡️ | addr 0xAA | reg 0xRR | action WRITE | bytes [0x.., ...]"
# - OK (write): "✅ | addr 0xAA | reg 0xRR | status OK"
# - Errors: "❌ | addr 0xAA | reg 0xRR | error <text>"
#
# ------------------------------------------------------------------------------
from __future__ import annotations
import os
import time
import threading
from enum import Enum, auto
from typing import List, Optional, Union
import statistics
import smbus2
import config.config as config
from components.console.console_registry import log_main_console
from components.data.db import get_i2c_commands
# Shared coordinator + timebase (UART-style exclusivity/throttling)
from components.scheduler.coordinator import (
claim_i2c_capture,
release_i2c_capture,
i2c_capture_active,
)
from components.scheduler.timebase import now_ns
# ---------------------------
# Helpers (parsing and formatting)
# ---------------------------
def _coerce_int(val: Union[str, int], base: int = 10) -> int:
"""Turn '1', '0x40', '64' or 1 into int. Raises ValueError on failure."""
if isinstance(val, int):
return val
s = str(val).strip().lower()
if s.startswith("0x"):
return int(s, 16)
return int(s, base)
def _parse_hex_bytes_list(s: str) -> List[int]:
"""
Parse strings like '0x01,0x02,0x03' or '01 02 03' or '01, 02, 03' into a list of ints.
"""
if not s:
return []
raw = [tok.strip() for tok in s.replace(",", " ").split()]
out = []
for t in raw:
if not t:
continue
# accept '0x..' or plain hex/dec
out.append(int(t, 0))
return out
# ---------------------------
# State Machine
# ---------------------------
class I2CState(Enum):
DISCONNECTED = auto()
CONNECTED_IDLE = auto()
LOGGING = auto()
CAPTURE_ACTIVE = auto()
# ---------------------------
# Main logic
# ---------------------------
class I2CLogic:
"""
Single class for:
- Connecting / disconnecting an I²C bus
- Sending one-shot register read/write commands under an exclusive capture window
- Running a continuous logger on the same bus (polls reg/length at an interval)
Threading model
---------------
- The logger runs in a background daemon thread started by start_logger().
- All SMBus calls (read/write/close) are serialized by self._bus_lock (RLock).
- The command send path (send_command) runs in the UI/main thread, but it
briefly "claims" exclusivity via the shared coordinator so:
* Other sends are rejected during capture,
* The logger throttles its output during the capture window.
"""
def __init__(self):
# Connection
self.bus: Optional[smbus2.SMBus] = None
self.bus_id: int = 1 # Raspberry Pi default /dev/i2c-1
# Threading & safety
self._bus_lock = threading.RLock() # serialize ALL SMBus I/O
self._logger_thread: Optional[threading.Thread] = None
self._logger_running: bool = False
self._logger_sleep_s: float = 0.05 # ~20 Hz idle yield
self._throttle_ns: int = 50_000_000 # 50 ms between emissions during capture
# Logger configuration
self._log_addr: Optional[int] = None
self._log_reg: int = 0
self._log_len: int = 1
self._log_interval_s: float = 0.20 # default 200 ms between polls
self._shutting_down = False # prevent new work during disconnects
self._2_bytes_read_counter = (
0 # to cinfurm how many times we were in the read 2 bytes function
)
# State
self._state = I2CState.DISCONNECTED
# Logger injection (UI provides console logger)
self.log = lambda level, msg: None # default: drop logs until UI injects one
# Inject the console logger (must be a callable: (type, message) -> None)
# components/i2c/i2c_logic.py
def set_logger(self, log_func):
if callable(log_func):
self.log = log_func
# ----- convenience formatters -----
def _fmt_hex(self, v: int, width: int = 2) -> str:
return f"0x{v:0{width}X}"
def _fmt_bytes(self, data) -> str:
if data is None:
return "[]"
return "[" + ", ".join(f"0x{b:02X}" for b in data) + "]"
def _to_int_any(self, v, default=None):
try:
return int(str(v).strip(), 0) # accepts "0xFE" or "254"
except Exception:
return default
def _parse_hex_list(self, s: str) -> List[int]:
return _parse_hex_bytes_list(s)
# ---------------------------
# State & Channels
# ---------------------------
def state(self) -> I2CState:
return self._state
def get_channels(self) -> List[str]:
"""List available /dev/i2c-* buses as strings of bus ids."""
i2c_devices = []
try:
for entry in os.listdir("/dev"):
if entry.startswith("i2c-"):
try:
bus_id = int(entry.split("-")[1])
i2c_devices.append(str(bus_id))
except ValueError:
continue
except Exception as e:
self.log("warn", f"Could not list /dev for i2c buses: {e}")
return sorted(i2c_devices)
def scan_bus(self) -> List[str]:
"""
Scan the currently opened bus for devices.
Returns like ['0x40', '0x41'].
"""
found: List[str] = []
with self._bus_lock:
if self.bus is None:
return found
try:
for address in range(0x03, 0x78): # valid 7-bit range
try:
# write_quick is the classic probe; some devices NACK writes.
self.bus.write_quick(address)
found.append(f"0x{address:02X}")
except OSError:
continue
except Exception as e:
self.log("warn", f"Unexpected at 0x{address:02X}: {e}")
continue
self.log("info", f"Scan complete. Found devices: {found}")
except Exception as e:
self.log("error", f"Scan failed: {e}")
return found
# ---------------------------
# Connection lifecycle
# ---------------------------
def connect(self, port: Union[int, str]) -> bool:
"""
Open the I²C bus. 'port' can be an int or string ('1' -> /dev/i2c-1).
"""
if config.DEBUG_MODE:
self._state = I2CState.CONNECTED_IDLE
self.log("info", "DEBUG: simulate I²C connect")
self.log("success", f"🔗 Connected to I²C bus {port} (DEBUG)")
return True
try:
self.bus_id = _coerce_int(port)
with self._bus_lock:
self.bus = smbus2.SMBus(self.bus_id)
self._state = I2CState.CONNECTED_IDLE
self.log("success", f"🔗 Connected to I²C bus {self.bus_id}")
return True
except FileNotFoundError:
self.log("error", f"I²C bus {port} not found (no /dev/i2c-{port}).")
except Exception as e:
self.log("error", f"❌ Connection failed: {e}")
self._state = I2CState.DISCONNECTED
return False
def disconnect(self) -> bool:
"""
Stop logger (join) and close the bus cleanly.
Safe to call even if already disconnected.
"""
if config.DEBUG_MODE:
self._state = I2CState.DISCONNECTED
self.log("info", "DEBUG: simulate I²C disconnect")
return True
try:
# 1) Stop logger first so no thread touches the bus.
self.stop_logger()
# 2) Close the bus under the same lock used for I/O.
with self._bus_lock:
if self.bus:
try:
self.bus.close()
except Exception:
pass
self.bus = None
self._state = I2CState.DISCONNECTED
self.log("info", f"🔌 Disconnected from I²C bus {self.bus_id}")
return True
except Exception as e:
self.log("error", f"❌ Disconnection error: {e}")
return False
# ---------------------------
# One-shot command + capture window (UART-style)
# ---------------------------
def send_command(self, command, device_address=0x40):
"""
Execute a single I²C action under an exclusive capture window.
command: either
- str (raw): "01,02,03" (block write to reg 0x00)
- dict: {
'action'/'operation': 'read'|'write',
'reg'/'register': int|str,
'length': int (for read),
'value': int (single-byte write),
'bytes': '01,02,03' (block write string)
}
device_address: int or str ('0x40' accepted). If None, attempt to read
from command['address'] if present.
"""
if self._shutting_down:
self.log("warning", "⛔ I²C is shutting down; command ignored.")
return None
# must be connected in real mode
if not config.DEBUG_MODE and not self.bus:
self.log("error", "❌ | bus not connected")
return None
# Claim capture (exclusive) — reject if already in use
deadline_ns = now_ns() + 300_000_000 # ~300 ms default window
if not claim_i2c_capture(deadline_ns, owner="I2C_CMD"):
self.log("warning", "⛔ Busy: capture window active. Try again shortly.")
return None
prev_state = self._state
self._state = I2CState.CAPTURE_ACTIVE
try:
# Normalize device address
if device_address is not None:
addr = self._to_int_any(device_address, default=0x40)
elif isinstance(command, dict) and "address" in command:
addr = self._to_int_any(command["address"], default=0x40)
else:
self.log("error", "❌ | no device address provided")
return None
# RAW string mode (block write to reg 0x00)
if isinstance(command, str):
data = self._parse_hex_list(command)
self.log(
"info",
f"➡️ | addr {self._fmt_hex(addr)} | reg 0x00 | action WRITE | bytes {self._fmt_bytes(data)}",
)
with self._bus_lock:
if not config.DEBUG_MODE:
self.bus.write_i2c_block_data(addr, 0x00, data)
self.log(
"success", f"✅ | addr {self._fmt_hex(addr)} | reg 0x00 | status OK"
)
return True
# Dict mode
action = (
command.get("action") or command.get("operation") or "read"
).upper()
reg = self._to_int_any(
command.get("reg", command.get("register", 0)), default=0
)
if action == "READ":
length = int(command.get("length", 1))
self.log(
"info",
f"➡️ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | action READ | len {length}",
)
with self._bus_lock:
if not config.DEBUG_MODE:
if length <= 1:
data = [self.bus.read_byte_data(addr, reg)]
else:
data = self.bus.read_i2c_block_data(addr, reg, length)
else:
# synthetic data in DEBUG
data = [0xBE] if length <= 1 else list(range(length))
self.log(
"success",
f"⬅️ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | data {self._fmt_bytes(data)}",
)
return data if length > 1 else data[0]
elif action == "WRITE":
if "bytes" in command and command["bytes"]:
data = self._parse_hex_list(command["bytes"])
self.log(
"info",
f"➡️ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | action WRITE | bytes {self._fmt_bytes(data)}",
)
with self._bus_lock:
if not config.DEBUG_MODE:
self.bus.write_i2c_block_data(addr, reg, data)
self.log(
"success",
f"✅ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | status OK",
)
return True
else:
value = self._to_int_any(command.get("value", 0))
self.log(
"info",
f"➡️ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | action WRITE | value {self._fmt_hex(value)}",
)
with self._bus_lock:
if not config.DEBUG_MODE:
self.bus.write_byte_data(addr, reg, value)
self.log(
"success",
f"✅ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | status OK",
)
return True
else:
self.log(
"warning",
f"⚠️ | addr {self._fmt_hex(addr)} | reg {self._fmt_hex(reg)} | unknown action '{action}'",
)
return None
except Exception as e:
self.log("error", f"❌ | error: {e}")
return None
finally:
release_i2c_capture()
# restore state (back to LOGGING if logger is on)
self._state = (
I2CState.LOGGING if self._logger_running else I2CState.CONNECTED_IDLE
)
# ---------------------------
# Hot-path primitive (no logging, no waits)
# ---------------------------
def read_2_bytes(self, addr_7bit: int, reg: int) -> dict:
"""
Fast, on-demand I²C read of exactly 2 bytes from 'reg' at 7-bit address.
- Immediate-or-skip capture; no waits, no prints, no retries.
"""
# must be connected in real (non-debug) mode
if not config.DEBUG_MODE and self.bus is None:
return {"status": "ERR_NOT_CONNECTED", "addr": addr_7bit, "reg": reg}
# Give the coordinator a tiny future window so claim is valid
deadline_ns = now_ns() + 5_000
if not claim_i2c_capture(deadline_ns, owner="I2C_PKT"):
return {"status": "SKIPPED_BUSY", "addr": addr_7bit, "reg": reg}
try:
with self._bus_lock:
data = self.bus.read_i2c_block_data(addr_7bit, reg, 2)
if not isinstance(data, list) or len(data) != 2:
return {"status": "ERR", "addr": addr_7bit, "reg": reg}
return {
"status": "OK",
"bytes": [data[0], data[1]],
}
except Exception:
return {"status": "ERR", "addr": addr_7bit, "reg": reg}
finally:
release_i2c_capture()
def measure_zero_raw14(self, duration_ms: int = 200, interval_ms: int = 10):
"""
Measure zero for the I²C angle sensor.
Saves the result bytes into config.SESSION_ZERO = [MSB, LSB] (big-endian).
Returns the median raw14 (int) or None.
"""
samples = []
deadline = time.time() + (duration_ms / 1000.0)
try:
if self.connect(1):
self.log("info", "I²C auto-initialized on /dev/i2c-1 to set zero")
while time.time() < deadline:
rep = self.read_2_bytes(
0x40, 0xFE
) # dict: {"status":"OK","bytes":[b0,b1]}
if isinstance(rep, dict) and rep.get("status") == "OK":
by = rep.get("bytes")
if isinstance(by, list) and len(by) == 2:
raw16 = ((by[0] & 0xFF) << 8) | (by[1] & 0xFF) # big-endian
raw14 = raw16 & 0x3FFF
samples.append(raw14)
time.sleep(interval_ms / 1000.0)
if not samples:
self.log("warn", "I²C zero measurement collected no valid samples")
return None
med = int(statistics.median(samples))
msb = (med >> 8) & 0xFF
lsb = med & 0xFF
# Store exactly the two bytes
config.SESSION_ZERO = [msb, lsb]
self.log(
"info", f"I²C zero raw14={med} -> SESSION_ZERO={msb:02X} {lsb:02X}"
)
return med
except Exception as e:
self.log("error", f"I²C zero measurement exception: {e}")
return None
# ---------------------------
# Continuous Logger (thread) — UART-style
# ---------------------------
def start_logger(self, device_address, reg, length, interval_ms: int = 200) -> bool:
"""
Start the background I²C logger thread.
- If already running: no-op, returns True.
- If not connected (and not in DEBUG): returns False.
Behavior:
- At each interval, performs a READ of 'length' bytes at 'reg' from 'device_address'.
- During a capture window, emissions are throttled to at most once every _throttle_ns.
"""
if self._logger_running:
self.log("info", " I²C logger already running.")
return True
if not config.DEBUG_MODE and not self.bus:
self.log("error", "⚠️ Cannot start I²C logger: not connected.")
return False
addr = self._to_int_any(device_address)
r = self._to_int_any(reg)
ln = self._to_int_any(length, default=1)
if addr is None or r is None or ln is None or ln <= 0:
self.log("warning", "⚠️ Invalid logger params (addr/reg/length).")
return False
self._log_addr = addr
self._log_reg = r
self._log_len = ln
self._log_interval_s = max(0.001, (interval_ms or 200) / 1000.0)
self._logger_running = True
self._logger_thread = threading.Thread(
target=self._logger_loop, name="I2CLogger", daemon=True
)
self._logger_thread.start()
self._state = I2CState.LOGGING
self.log(
"success",
f"🟢 I²C logger started | addr 0x{addr:02X} | reg 0x{r:02X} | len {ln} | interval {interval_ms}ms",
)
return True
def stop_logger(self) -> bool:
"""
Stop the background I²C logger thread if running.
Joins with a short timeout so shutdown remains responsive.
"""
if not self._logger_running:
return True
self._logger_running = False
t = self._logger_thread
self._logger_thread = None
# Wait up to ~2s for a clean exit (covers a read thats mid-call + pacing sleep)
if t and t.is_alive():
t.join(timeout=2.0)
# If still alive, wait a tiny bit more — but don't hang the UI
if t.is_alive():
time.sleep(0.05)
if self.bus:
self._state = I2CState.CONNECTED_IDLE
self.log("info", "🔴 I²C logger stopped.")
return True
def _logger_loop(self):
"""
Background poller (daemon thread):
- DEBUG_MODE: emits synthetic lines periodically so you can test the UI without hardware.
- Real bus: performs read(s) at ~_log_interval_s cadence.
- During capture: throttle emissions (drop extras) to one per _throttle_ns.
"""
addr = self._log_addr
r = self._log_reg
ln = self._log_len
pace = self._log_interval_s
last_emit_ns = now_ns()
try:
while self._logger_running:
try:
# Respect capture throttle (like UART)
if self._shutting_down:
break # exit ASAP on shutdown
if i2c_capture_active():
now = now_ns()
if now - last_emit_ns < self._throttle_ns:
time.sleep(self._logger_sleep_s)
continue
last_emit_ns = now
# TX line
self.log(
"info",
f"➡️ | addr 0x{addr:02X} | reg 0x{r:02X} | action READ | len {ln}",
)
# Perform read under the same bus lock as send_command
with self._bus_lock:
if not self.bus and not config.DEBUG_MODE:
break
if not self.bus and not config.DEBUG_MODE:
break
if config.DEBUG_MODE:
data = [0xBE] if ln <= 1 else list(range(ln))
else:
if ln <= 1:
data = [self.bus.read_byte_data(addr, r)]
else:
data = self.bus.read_i2c_block_data(addr, r, ln)
# RX line
self.log(
"success",
f"⬅️ | addr 0x{addr:02X} | reg 0x{r:02X} | data {self._fmt_bytes(data)}",
)
except Exception as e:
self.log(
"error", f"❌ | addr 0x{addr:02X} | reg 0x{r:02X} | error {e}"
)
# pacing
time.sleep(pace or self._logger_sleep_s)
finally:
self._logger_running = False
# ---------------------------
# Commands catalog passthrough
# ---------------------------
def get_predefined_commands(self):
"""
Load predefined I²C commands for your command table.
The DB returns rows mapped by components.data.db.get_i2c_commands().
"""
return get_i2c_commands()

@ -0,0 +1,151 @@
# File: components/i2c/i2c_polling.py
from PyQt6.QtWidgets import QWidget, QVBoxLayout, QLabel, QPushButton, QHBoxLayout
from PyQt6.QtCore import QTimer, Qt
from PyQt6.QtGui import QPainter, QColor, QPen
from components.console.console_registry import log_main_console
from smbus2 import SMBus
import config.config as config
import math
AS5048_ADDR = 0x40
REG_ANGLE_MSB = 0xFE
REG_ANGLE_LSB = 0xFF
class AngleGauge(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setMinimumSize(200, 200)
self.angle = 0 # degrees
def set_angle(self, angle):
self.angle = max(0.0, min(90.0, angle))
self.update()
def paintEvent(self, event):
painter = QPainter(self)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
rect = self.rect()
width = rect.width()
height = rect.height()
center_x = width // 2
bottom_y = height - 10
length = height - 40
# Draw vertical Y axis
pen = QPen(QColor(180, 180, 180), 2)
painter.setPen(pen)
painter.drawLine(center_x, bottom_y, center_x, bottom_y - length)
# Draw needle
radians = math.radians(self.angle)
end_x = int(center_x + length * math.sin(radians))
end_y = int(bottom_y - length * math.cos(radians))
pen = QPen(QColor(0, 100, 200), 4)
painter.setPen(pen)
painter.drawLine(center_x, bottom_y, end_x, end_y)
# Draw angle value
painter.setPen(Qt.GlobalColor.black)
painter.drawText(
rect,
int(Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignHCenter),
f"{self.angle:.1f}°",
)
class I2CPollingWidget(QWidget):
def __init__(self, bus_id=1, parent=None):
super().__init__(parent)
self.label = QLabel("AS5048A Angle Sensor")
self.gauge = AngleGauge()
self.zero_raw = None
self.start_button = QPushButton("Start")
self.stop_button = QPushButton("Stop")
self.set_zero_button = QPushButton("Set Zero")
self.start_button.clicked.connect(self.start_polling)
self.stop_button.clicked.connect(self.stop_polling)
self.set_zero_button.clicked.connect(self.set_zero)
button_layout = QHBoxLayout()
button_layout.addWidget(self.start_button)
button_layout.addWidget(self.stop_button)
button_layout.addWidget(self.set_zero_button)
layout = QVBoxLayout()
layout.addWidget(self.label)
layout.addWidget(self.gauge)
layout.addLayout(button_layout)
self.setLayout(layout)
self.bus_id = bus_id
self.bus = None
self.timer = QTimer(self)
self.timer.timeout.connect(self.read_angle)
if not config.DEBUG_MODE:
try:
self.bus = SMBus(self.bus_id)
log_main_console("success", f"AS5048A ready on bus {self.bus_id}")
except Exception as e:
log_main_console("error", f"Failed to open I2C bus {self.bus_id}: {e}")
else:
log_main_console("info", "DEBUG_MODE: fake polling setup complete")
def start_polling(self):
self.timer.start(50)
log_main_console("info", "Started I2C angle polling")
def stop_polling(self):
self.timer.stop()
log_main_console("info", "Stopped I2C angle polling")
def set_zero(self):
raw = self._read_raw_angle()
if raw is not None:
self.zero_raw = raw
log_main_console("info", f"Zero set to raw value: {self.zero_raw}")
def _read_raw_angle(self):
if config.DEBUG_MODE:
import random
return random.randint(0, 16383)
try:
msb = self.bus.read_byte_data(AS5048_ADDR, REG_ANGLE_MSB)
lsb = self.bus.read_byte_data(AS5048_ADDR, REG_ANGLE_LSB)
raw = ((msb << 6) | (lsb & 0x3F)) & 0x3FFF
return raw
except Exception as e:
log_main_console("error", f"I2C read failed: {e}")
return None
def read_angle(self):
current_raw = self._read_raw_angle()
if current_raw is None or self.zero_raw is None:
return
# Calculate signed delta
diff = (current_raw - self.zero_raw + 8192) % 16384 - 8192
angle_deg = -((diff * 360.0) / 16384.0)
# Clamp to 090° range for display
angle_clamped = max(0.0, min(90.0, angle_deg))
log_main_console(
"info",
f"Read raw: {current_raw}, Δ: {diff}, Degrees: {angle_deg:.2f}° → Gauge: {angle_clamped:.2f}°",
)
self.gauge.set_angle(angle_clamped)
def closeEvent(self, event):
self.timer.stop()
if self.bus:
self.bus.close()
super().closeEvent(event)

@ -0,0 +1,342 @@
from PyQt6.QtWidgets import (
QWidget,
QVBoxLayout,
QHBoxLayout,
QPushButton,
QComboBox,
QLineEdit,
QSplitter,
)
from PyQt6.QtCore import Qt
from PyQt6.QtCore import QObject, pyqtSignal
from components.console.console_ui import console_widget
from components.console.console_registry import log_main_console
import components.items.elements as elements
from components.commands.command_table_ui import command_table_widget
import config.config as config
from components.i2c.i2c_logic import I2CLogic, _coerce_int
from components.i2c.i2c_command_editor import I2CCommandEditorDialog
from components.data import db
class _SafeConsoleProxy(QObject):
log_signal = pyqtSignal(str, str)
def __init__(self, console):
super().__init__()
self.log_signal.connect(console.log)
def __call__(self, level, msg):
# called from worker thread → emits safely into UI thread
self.log_signal.emit(level, msg)
class I2CHandler:
"""Adapter used by command_table_widget → forwards to the I2CWidget."""
def __init__(self, widget: "I2CWidget"):
self.w = widget
def get_command_list(self):
return self.w.i2c_logic.get_predefined_commands()
def send_command(self, command: dict):
self.w.send_command(command)
def add_command(self):
dialog = I2CCommandEditorDialog()
if dialog.exec():
command = dialog.get_data()
db.add_i2c_command(command)
def modify_command(self, command):
dialog = I2CCommandEditorDialog(command=command)
if dialog.exec():
updated = dialog.get_data()
updated["id"] = command["id"]
db.modify_i2c_command(updated)
def delete_command(self, command):
db.delete_i2c_command(command)
class I2CWidget(QWidget):
"""Drop-in single-page I2C widget with table | console splitter."""
def __init__(self, parent=None):
super().__init__(parent)
self.i2c_logic = I2CLogic()
self.handler = I2CHandler(self)
self.commands = self.i2c_logic.get_predefined_commands()
self.comboboxes = {}
self.connection_status = False
self.init_ui()
def init_ui(self):
# Top controls
top_controls = QWidget()
top_controls_layout = QHBoxLayout(top_controls)
top_controls_layout.setContentsMargins(0, 0, 0, 0)
top_controls_layout.setSpacing(12)
top_controls_layout.setAlignment(Qt.AlignmentFlag.AlignTop)
# Port
self.comboboxes["port"] = QComboBox()
self.comboboxes["port"].addItems(self.i2c_logic.get_channels())
top_controls_layout.addWidget(
elements.label_and_widget("Port", self.comboboxes["port"])
)
self.button_refresh_channels = elements.create_icon_button(
config.REFRESH_BUTTON_ICON_LINK, icon_size=30, border_size=4
)
top_controls_layout.addWidget(self.button_refresh_channels)
# Address (empty until connected → scan)
self.comboboxes["address"] = QComboBox()
top_controls_layout.addWidget(
elements.label_and_widget("Address", self.comboboxes["address"])
)
self.button_refresh_address = elements.create_icon_button(
config.REFRESH_BUTTON_ICON_LINK, icon_size=30, border_size=4
)
top_controls_layout.addWidget(self.button_refresh_address)
# Connect / Disconnect
self.button_connect = QPushButton("Connect")
top_controls_layout.addWidget(
elements.label_and_widget("", self.button_connect)
)
self.button_disconnect = QPushButton("Disconnect")
top_controls_layout.addWidget(
elements.label_and_widget("", self.button_disconnect)
)
# Command table
self.command_table = command_table_widget(
commands=self.commands, handler=self.handler
)
col1_widget = QWidget()
col1_layout = QVBoxLayout(col1_widget)
col1_layout.setContentsMargins(0, 0, 0, 0)
col1_layout.setSpacing(4)
col1_layout.addWidget(self.command_table)
# Input + Send
input_line_layout = QHBoxLayout()
input_line_layout.setContentsMargins(0, 0, 0, 0)
input_line_layout.setSpacing(4)
self.comboboxes["action"] = QComboBox()
self.comboboxes["action"].addItems(["Read", "Write"])
input_line_layout.addWidget(
elements.label_and_widget("Action", self.comboboxes["action"])
)
self.input_register = QLineEdit()
self.input_register.setPlaceholderText("0x04")
input_line_layout.addWidget(
elements.label_and_widget("Register", self.input_register)
)
self.input_hex = QLineEdit()
self.input_hex.setPlaceholderText(
"Hex bytes (e.g. 0x01,0x02,0x03) → block write to reg 0x00"
)
input_line_layout.addWidget(elements.label_and_widget("Data", self.input_hex))
self.input_length = QLineEdit()
self.input_length.setPlaceholderText("1")
input_line_layout.addWidget(
elements.label_and_widget("Length", self.input_length)
)
self.button_send_raw = QPushButton("Send Raw")
input_line_layout.addWidget(elements.label_and_widget("", self.button_send_raw))
# Console
self.console = console_widget()
self.i2c_logic.set_logger(self.console.log)
console_stack_widget = QWidget()
console_stack_layout = QVBoxLayout(console_stack_widget)
console_stack_layout.setContentsMargins(0, 0, 0, 0)
console_stack_layout.setSpacing(4)
console_stack_layout.addLayout(input_line_layout)
console_stack_layout.addWidget(self.console)
# Splitter: table | console
splitter = QSplitter(Qt.Orientation.Horizontal)
splitter.addWidget(col1_widget)
splitter.addWidget(console_stack_widget)
splitter.setSizes([740, 1200])
splitter.setStretchFactor(0, 0)
splitter.setStretchFactor(1, 1)
# Main layout
main_layout = QVBoxLayout()
main_layout.setContentsMargins(4, 4, 4, 4)
main_layout.setSpacing(6)
main_layout.addWidget(top_controls)
main_layout.addWidget(splitter, stretch=1)
self.setLayout(main_layout)
# Signals
self.button_refresh_channels.clicked.connect(self.refresh_channels)
self.button_refresh_address.clicked.connect(self.refresh_address)
self.button_connect.clicked.connect(self.connect)
self.button_disconnect.clicked.connect(self.disconnect)
self.button_send_raw.clicked.connect(self.send_command_raw)
self.disconnected_enable_status()
# --- UI state toggles ---
def disconnected_enable_status(self):
elements.set_enabled_state(True, self.button_refresh_channels, grayOut=False)
elements.set_enabled_state(True, self.comboboxes["port"], grayOut=False)
elements.set_enabled_state(True, self.command_table, grayOut=False)
elements.set_enabled_state(False, self.input_hex, grayOut=True)
elements.set_enabled_state(False, self.button_send_raw, grayOut=True)
elements.set_enabled_state(False, self.button_disconnect, grayOut=True)
elements.set_enabled_state(True, self.button_connect, grayOut=False)
elements.set_enabled_state(False, self.button_refresh_address, grayOut=True)
elements.set_enabled_state(False, self.comboboxes["address"], grayOut=True)
elements.set_enabled_state(False, self.comboboxes["action"], grayOut=True)
elements.set_enabled_state(False, self.input_register, grayOut=True)
def connected_enable_status(self):
elements.set_enabled_state(False, self.button_refresh_channels, grayOut=True)
elements.set_enabled_state(False, self.comboboxes["port"], grayOut=True)
elements.set_enabled_state(True, self.command_table, grayOut=False)
elements.set_enabled_state(True, self.input_hex, grayOut=False)
elements.set_enabled_state(True, self.button_send_raw, grayOut=False)
elements.set_enabled_state(True, self.button_disconnect, grayOut=False)
elements.set_enabled_state(False, self.button_connect, grayOut=True)
elements.set_enabled_state(True, self.button_refresh_address, grayOut=False)
elements.set_enabled_state(True, self.comboboxes["address"], grayOut=False)
elements.set_enabled_state(True, self.comboboxes["action"], grayOut=False)
elements.set_enabled_state(True, self.input_register, grayOut=False)
# --- Buttons ---
def connect(self):
log_main_console("info", "🔗 Connecting...")
success = self.i2c_logic.connect(self.comboboxes["port"].currentText())
if success:
self.connected_enable_status()
self.command_table.set_connected_state()
self.refresh_address(silent=True)
self.connection_status = True
else:
elements.flash_button(
self.button_connect, flash_style="background-color: red;"
)
def disconnect(self):
log_main_console("info", "🔌 Disconnecting...")
if self.i2c_logic.stop_logger():
self.disconnected_enable_status()
self.command_table.set_disconnected_state()
self.connection_status = False
self.refresh_channels(silent=True)
else:
elements.flash_button(
self.button_disconnect, flash_style="background-color: red;"
)
def refresh_channels(self, silent: bool = False):
log_main_console("info", "🔄 Refreshing buses...")
self.comboboxes["port"].clear()
ports = self.i2c_logic.get_channels()
if ports:
self.comboboxes["port"].addItems(ports)
if not silent:
elements.flash_button(self.button_refresh_channels)
log_main_console("success", "🔄 Bus list refreshed")
else:
elements.flash_button(
self.button_refresh_channels, flash_style="background-color: red;"
)
log_main_console("warn", "No I2C buses found")
def refresh_address(self, silent: bool = False):
log_main_console("info", "🔄 Scanning bus for devices...")
self.comboboxes["address"].clear()
addresses = self.i2c_logic.scan_bus()
if addresses:
self.comboboxes["address"].addItems(addresses)
if not silent:
elements.flash_button(self.button_refresh_address)
log_main_console("success", "🔄 Bus scan complete")
else:
elements.flash_button(
self.button_refresh_address, flash_style="background-color: red;"
)
log_main_console("info", "No devices detected")
def get_current_config(self):
return {key: cb.currentText() for key, cb in self.comboboxes.items()}
def _current_address_int(self) -> int:
txt = self.comboboxes["address"].currentText().strip() or "0x40"
return _coerce_int(txt)
def send_command(self, command: dict):
self.i2c_logic.send_command(command, device_address=self._current_address_int())
def send_command_raw(self):
action = self.comboboxes["action"].currentText().strip().upper()
reg_txt = (self.input_register.text() or "").strip()
data_txt = (self.input_hex.text() or "").strip()
length_txt = (self.input_length.text() or "").strip()
# --- helpers ---
def _to_int_any(s, default=None):
try:
return int(str(s), 0) # accepts "0xFE", "254"
except Exception:
return default
def _split_tokens(s: str):
return [t for t in s.replace(",", " ").split() if t]
# --- validate register ---
reg = _to_int_any(reg_txt)
if reg is None:
self.console.log("warning", "⚠️ Invalid or missing register (e.g. 0xFE).")
return
cmd = {
"action": action.lower(),
"reg": reg,
}
if action == "READ":
ln = _to_int_any(length_txt, default=1)
if ln is None or ln <= 0:
self.console.log("warning", "⚠️ Invalid length (must be >0).")
return
cmd["length"] = ln
elif action == "WRITE":
toks = _split_tokens(data_txt)
if not toks:
self.console.log(
"warning", "⚠️ WRITE requires data (e.g. 00 or 01,02,03)."
)
return
if len(toks) == 1:
v = _to_int_any(toks[0])
if v is None:
self.console.log(
"warning", "⚠️ Invalid byte. Use hex/dec like 0x00 or 0."
)
return
cmd["value"] = v
else:
cmd["bytes"] = data_txt # leave as string; logic will parse
self.i2c_logic.send_command(cmd, device_address=self._current_address_int())

218
run.py

@ -56,7 +56,7 @@ class RunExecutor:
Executes a single RUN.
A RUN consists of:
- Send UART command
- Send UART command OR execute I2C command
- Collect UART packets (with timestamps)
- Trigger I2C reads via callback (correlated timestamps)
- Wait for stop condition
@ -74,6 +74,7 @@ class RunExecutor:
self.db_conn = db_connection
self.i2c_readings = [] # Storage for I2C readings from callback
self.i2c_failures = 0 # Counter for I2C read failures
self.i2c_zero_reference = 0 # Absolute angle used as zero (0 = not zeroed)
def execute_run(
self,
@ -391,6 +392,145 @@ class RunExecutor:
return ("error", 0, f"Exception during run: {str(e)}")
def execute_i2c_command(
self,
session_id: str,
session_name: str,
run_no: int,
command_id: int,
command_name: str,
operation: str,
device_address: int,
register: int,
hex_string: str,
i2c_port: Optional[I2CHandle],
raw_data_callback = None
) -> Tuple[str, int, str]:
"""
Execute a single I2C command.
Args:
session_id: Session ID
session_name: Session name
run_no: Run number
command_id: I2C command ID from database
command_name: Command name (e.g., "zero", "read angle")
operation: Operation type (e.g., "read", "write", "zero")
device_address: I2C device address (e.g., 0x40)
register: Register address (e.g., 0xFE)
hex_string: Number of bytes to read/write
i2c_port: I2C handle
raw_data_callback: Callback for status updates
Returns:
(status, data_count, error_msg)
"""
try:
if not i2c_port:
return ("error", 0, "I2C port not available")
run_start_ns = time.time_ns()
# Parse hex_string as byte count for reads
try:
byte_count = int(hex_string, 16) if isinstance(hex_string, str) else int(hex_string)
except:
byte_count = 2 # Default to 2 bytes
# Special handling for "zero" operation
if operation.lower() == "zero" or "zero" in command_name.lower():
return self._execute_i2c_zero(
session_id=session_id,
session_name=session_name,
run_no=run_no,
command_id=command_id,
device_address=device_address,
register=register,
i2c_port=i2c_port,
run_start_ns=run_start_ns,
raw_data_callback=raw_data_callback
)
# Regular I2C read
elif operation.lower() == "read":
if raw_data_callback:
raw_data_callback("INFO", f"Read: addr=0x{device_address:02X} reg=0x{register:02X} len={byte_count}")
status, data = i2c_read_block(i2c_port, device_address, register, byte_count)
if status == I2CStatus.OK:
hex_data = ' '.join(f'{b:02X}' for b in data)
if raw_data_callback:
raw_data_callback("RX", f"{hex_data}")
return ("success", len(data), "")
else:
return ("error", 0, f"I2C read failed: {status}")
else:
return ("error", 0, f"Unsupported I2C operation: {operation}")
except Exception as e:
return ("error", 0, f"Exception during I2C command: {str(e)}")
def _execute_i2c_zero(
self,
session_id: str,
session_name: str,
run_no: int,
command_id: int,
device_address: int,
register: int,
i2c_port: I2CHandle,
run_start_ns: int,
raw_data_callback = None
) -> Tuple[str, int, str]:
"""
Execute I2C zeroing: take 50 samples, calculate median as zero reference.
Returns:
(status, sample_count, error_msg)
"""
import statistics
if raw_data_callback:
raw_data_callback("INFO", "Zeroing: Collecting 50 samples...")
samples = []
duration_ms = 500 # 500ms total
interval_ms = 10 # 10ms between samples (50 samples)
deadline = time.time() + (duration_ms / 1000.0)
while time.time() < deadline and len(samples) < 50:
status, data = i2c_read_block(i2c_port, device_address, register, 2)
if status == I2CStatus.OK and len(data) == 2:
# Convert to 14-bit raw value (big-endian)
raw16 = ((data[0] & 0xFF) << 8) | (data[1] & 0xFF)
raw14 = raw16 & 0x3FFF
samples.append(raw14)
time.sleep(interval_ms / 1000.0)
if not samples:
if raw_data_callback:
raw_data_callback("ERROR", "I2C Zeroing failed: No valid samples")
return ("error", 0, "No valid samples collected")
# Calculate median (better than average for outlier rejection)
median_raw14 = int(statistics.median(samples))
self.i2c_zero_reference = median_raw14
# Convert to angle (360° / 16384 counts)
zero_angle_deg = (median_raw14 * 360.0) / 16384.0
if raw_data_callback:
raw_data_callback("INFO", f"✓ Zero set: raw14={median_raw14} ({zero_angle_deg:.2f}°) from {len(samples)} samples")
# Zero reference stored in memory only (self.i2c_zero_reference)
# Will be saved to telemetry tables when UART commands execute
return ("success", len(samples), "")
def _parse_hex_string(self, hex_str: str) -> Optional[bytes]:
"""
Parse hex string to bytes.
@ -436,8 +576,8 @@ class RunExecutor:
cursor.execute("""
INSERT INTO telemetry_raw (
session_id, session_name, run_no, run_command_id,
t_ns, time_ms, uart_raw_packet, i2c_raw_bytes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
t_ns, time_ms, uart_raw_packet, i2c_raw_bytes, i2c_zero_ref
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
session_id,
session_name,
@ -446,7 +586,8 @@ class RunExecutor:
packet_info.start_timestamp,
time_ms,
packet_info.data,
i2c_bytes # Can be None if no I2C
i2c_bytes, # Can be None if no I2C
self.i2c_zero_reference # Zero reference (0 if not zeroed)
))
# Save to telemetry_decoded (main data)
@ -455,15 +596,16 @@ class RunExecutor:
cursor.execute("""
INSERT INTO telemetry_decoded (
session_id, session_name, run_no, run_command_id,
t_ns, time_ms
) VALUES (?, ?, ?, ?, ?, ?)
t_ns, time_ms, i2c_zero_ref
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
session_id,
session_name,
run_no,
run_command_id,
packet_info.start_timestamp,
time_ms
time_ms,
self.i2c_zero_reference # Zero reference (0 if not zeroed)
))
# TODO: When decoder is fully implemented, also save:
@ -490,6 +632,7 @@ def execute_run(
i2c_register: int = 0xFE,
stop_timeout_ms: int = 5000,
grace_timeout_ms: int = 1500,
i2c_zero_ref: int = 0,
raw_data_callback = None
) -> Tuple[str, int, str]:
"""
@ -506,14 +649,18 @@ def execute_run(
uart_logger_port: UART logger port (RX for telemetry, optional)
i2c_port: I2C port (optional)
packet_config: Packet detection configuration
i2c_address: I2C device address
i2c_register: I2C register address
stop_timeout_ms: Stop condition timeout
grace_timeout_ms: Grace period before first packet
i2c_zero_ref: I2C zero reference (0 = not zeroed)
raw_data_callback: Callback for raw data display (direction, hex_string)
Returns:
(status, packet_count, error_msg)
"""
executor = RunExecutor(db_connection)
executor.i2c_zero_reference = i2c_zero_ref # Set zero reference from session
return executor.execute_run(
session_id=session_id,
session_name=session_name,
@ -532,6 +679,63 @@ def execute_run(
)
def execute_i2c_command(
db_connection: sqlite3.Connection,
session_id: str,
session_name: str,
run_no: int,
command_id: int,
command_name: str,
operation: str,
device_address: int,
register: int,
hex_string: str,
i2c_port: Optional[I2CHandle],
i2c_zero_ref: int = 0,
raw_data_callback = None
) -> Tuple[str, int, str, int]:
"""
Execute a single I2C command (convenience function).
Args:
db_connection: Database connection
session_id: Session ID
session_name: Session name
run_no: Run number
command_id: I2C command ID
command_name: Command name
operation: Operation type (read/write/zero)
device_address: I2C device address
register: Register address
hex_string: Byte count or data
i2c_port: I2C handle
i2c_zero_ref: Current I2C zero reference (0 = not zeroed)
raw_data_callback: Callback for status updates
Returns:
(status, data_count, error_msg, updated_i2c_zero_ref)
"""
executor = RunExecutor(db_connection)
executor.i2c_zero_reference = i2c_zero_ref # Set zero reference from session
status, data_count, error_msg = executor.execute_i2c_command(
session_id=session_id,
session_name=session_name,
run_no=run_no,
command_id=command_id,
command_name=command_name,
operation=operation,
device_address=device_address,
register=register,
hex_string=hex_string,
i2c_port=i2c_port,
raw_data_callback=raw_data_callback
)
# Return updated zero reference (may have changed if zero command)
return (status, data_count, error_msg, executor.i2c_zero_reference)
if __name__ == "__main__":
print("Run Module")
print("=" * 60)

@ -60,7 +60,7 @@ from i2c.i2c_kit.i2c_core import (
)
# Run executor
from run import execute_run
from run import execute_run, execute_i2c_command
# Database manager
from database.init_database import DatabaseManager
@ -139,7 +139,13 @@ class Session(QObject):
# Phase execution (multi-phase support: Init, Execute, De-init)
self.phases: List[Dict[str, Any]] = [] # List of phase configs
self.total_commands: int = 0 # Total commands across all phases
self.total_uart_runs: int = 0 # Total UART commands (actual runs)
self.total_i2c_commands: int = 0 # Total I2C commands (not runs)
self.current_command_index: int = 0 # Global command counter
self.current_phase_name: str = "" # Current executing phase name
# I2C zero reference (persists across all runs in session)
self.i2c_zero_reference: int = 0 # Absolute angle used as zero (0 = not zeroed)
# Execution control flags
self.is_running: bool = False
@ -243,6 +249,8 @@ class Session(QObject):
# Storage for phase configurations
self.phases = [] # List of dicts: {'name': str, 'commands': list, 'profile_id': int}
self.total_commands = 0
self.total_uart_runs = 0
self.total_i2c_commands = 0
# Load Init phase
if init_session_id is not None:
@ -252,6 +260,12 @@ class Session(QObject):
if phase_config:
self.phases.append(phase_config)
self.total_commands += len(phase_config['commands'])
# Count by type
for cmd in phase_config['commands']:
if cmd.get('command_type', 'uart') == 'i2c':
self.total_i2c_commands += 1
else:
self.total_uart_runs += 1
# Load Execute phase
if execute_session_id is not None:
@ -261,6 +275,12 @@ class Session(QObject):
if phase_config:
self.phases.append(phase_config)
self.total_commands += len(phase_config['commands'])
# Count by type
for cmd in phase_config['commands']:
if cmd.get('command_type', 'uart') == 'i2c':
self.total_i2c_commands += 1
else:
self.total_uart_runs += 1
# Load De-init phase
if deinit_session_id is not None:
@ -270,6 +290,12 @@ class Session(QObject):
if phase_config:
self.phases.append(phase_config)
self.total_commands += len(phase_config['commands'])
# Count by type
for cmd in phase_config['commands']:
if cmd.get('command_type', 'uart') == 'i2c':
self.total_i2c_commands += 1
else:
self.total_uart_runs += 1
# Check at least one phase loaded
if len(self.phases) == 0:
@ -295,7 +321,7 @@ class Session(QObject):
self.status_changed.emit(f"Multi-phase session loaded: {self.session_name}")
self.status_changed.emit(f"Interface: {self.interface_config['profile_name']}")
self.status_changed.emit(f"Phases: {phase_summary}")
self.status_changed.emit(f"Total commands: {self.total_commands}")
self.status_changed.emit(f"Total: {self.total_uart_runs} UART runs, {self.total_i2c_commands} I2C commands")
return True
@ -352,7 +378,30 @@ class Session(QObject):
self.error_occurred.emit(f"{phase_name}: Command missing command_id")
return (False, None)
# Check if command exists and load details
# Check command type (default to 'uart' for backwards compatibility)
command_type = cmd.get('command_type', 'uart')
if command_type == 'i2c':
# Load I2C command from database
cursor = self.db_conn.execute("""
SELECT command_name, operation, register, hex_string, device_address
FROM i2c_commands
WHERE command_id = ?
""", (cmd_id,))
row = cursor.fetchone()
if not row:
self.error_occurred.emit(f"{phase_name}: I2C command {cmd_id} not found")
return (False, None)
# Store I2C command details
cmd['command_name'] = row[0]
cmd['operation'] = row[1]
cmd['register'] = row[2]
cmd['hex_string'] = row[3]
cmd['device_address'] = row[4]
else:
# Load UART command from database (default)
cursor = self.db_conn.execute("""
SELECT command_name, hex_string
FROM uart_commands
@ -364,7 +413,7 @@ class Session(QObject):
self.error_occurred.emit(f"{phase_name}: UART command {cmd_id} not found")
return (False, None)
# Store command details
# Store UART command details
cmd['command_name'] = row[0]
cmd['hex_string'] = row[1]
@ -675,6 +724,7 @@ class Session(QObject):
self.pause_queued = False
self.stop_queued = False
self.current_command_index = 0
self.i2c_zero_reference = 0 # Reset zero reference for new session
# Emit session started
self.session_started.emit(self.current_session_id)
@ -710,11 +760,13 @@ class Session(QObject):
ALL remaining phases.
"""
try:
global_cmd_index = 0 # Track overall command number
global_cmd_index = 0 # Track UART run number
total_commands_executed = 0 # Track all commands (UART + I2C)
# Loop through phases
for phase_index, phase in enumerate(self.phases, 1):
phase_name = phase['name']
self.current_phase_name = phase_name # Track current phase
phase_commands = phase['commands']
total_phases = len(self.phases)
@ -723,11 +775,9 @@ class Session(QObject):
# Loop through commands in this phase
for cmd_index_in_phase, cmd in enumerate(phase_commands, 1):
global_cmd_index += 1
self.current_command_index = global_cmd_index
total_commands_executed += 1 # Count all commands
# ===============================================================
# 1. Check if stop was queued (before starting new run)
# 1. Check if stop was queued (before starting new command)
# ===============================================================
if self.stop_queued:
@ -736,20 +786,64 @@ class Session(QObject):
return
# ===============================================================
# 2. Emit command started
# 2. Check command type - I2C commands don't count as runs
# ===============================================================
command_type = cmd.get('command_type', 'uart')
command_name = cmd['command_name']
# Only increment run number for UART commands
if command_type == 'uart':
global_cmd_index += 1
self.current_command_index = global_cmd_index
# ===============================================================
# 3. Emit command started
# ===============================================================
if command_type == 'i2c':
# I2C command - formatted output: [Phase] [I2C] [INFO] message
self.raw_data_received.emit(
f"[{phase_name}] [I2C] [INFO]",
f"Command: {command_name}"
)
else:
# UART command - formatted output: [Phase] [UART] [INFO] message
self.command_started.emit(global_cmd_index, command_name)
self.status_changed.emit(
f"[{phase_name}] Command {cmd_index_in_phase}/{len(phase_commands)} "
f"(Total: {global_cmd_index}/{self.total_commands}): {command_name}"
self.raw_data_received.emit(
f"[{phase_name}] [UART] [INFO]",
f"Run {global_cmd_index}: {command_name}"
)
# ===============================================================
# 3. Execute run via run.py
# 4. Execute command (UART or I2C) via run.py
# ===============================================================
if command_type == 'i2c':
# Execute I2C command (doesn't count as run, no telemetry saved)
# Callback formats as: [Phase] [I2C] [Action] message
status, packet_count, error_msg, updated_zero_ref = execute_i2c_command(
db_connection=self.db_conn,
session_id=self.current_session_id,
session_name=self.session_name,
run_no=0, # Not a run
command_id=cmd['command_id'],
command_name=cmd['command_name'],
operation=cmd.get('operation', 'read'),
device_address=int(cmd.get('device_address', '0x40'), 16),
register=int(cmd.get('register', '0xFE'), 16),
hex_string=cmd.get('hex_string', '02'),
i2c_port=self.i2c_handle,
i2c_zero_ref=self.i2c_zero_reference,
raw_data_callback=lambda action, msg: self.raw_data_received.emit(
f"[{phase_name}] [I2C] [{action}]", msg
)
)
# Update session's zero reference (may have changed if zero command)
self.i2c_zero_reference = updated_zero_ref
else:
# Execute UART command (counts as run, saves telemetry)
# Callback formats as: [Phase] [UART] [Action] message
status, packet_count, error_msg = execute_run(
db_connection=self.db_conn,
session_id=self.current_session_id,
@ -765,28 +859,45 @@ class Session(QObject):
i2c_register=int(self.interface_config['i2c_slave_read_register'], 16) if self.interface_config.get('i2c_slave_read_register') else 0xFE,
stop_timeout_ms=self.interface_config['uart_logger_timeout_ms'],
grace_timeout_ms=self.interface_config['uart_logger_grace_ms'],
raw_data_callback=lambda direction, hex_str: self.raw_data_received.emit(direction, hex_str)
i2c_zero_ref=self.i2c_zero_reference,
raw_data_callback=lambda action, msg: self.raw_data_received.emit(
f"[{phase_name}] [UART] [{action}]", msg
)
)
# ===============================================================
# 4. Handle run result
# 5. Handle command result
# ===============================================================
if status == "error":
# Run failed - abort session (all phases)
# Command failed - abort session (all phases)
if command_type == 'i2c':
self.error_occurred.emit(f"[{phase_name}] I2C command '{command_name}' failed: {error_msg}")
else:
self.error_occurred.emit(f"[{phase_name}] Run {global_cmd_index} failed: {error_msg}")
self._finalize_session('error')
return
# Run succeeded - emit completion
# Command succeeded
if command_type == 'uart':
# UART run - emit completion and update database
self.run_completed.emit(global_cmd_index, packet_count)
self.status_changed.emit(f"[{phase_name}] Run {global_cmd_index} complete: {packet_count} packets")
self.raw_data_received.emit(
f"[{phase_name}] [UART] [INFO]",
f"Complete: {packet_count} packets"
)
# Update total runs in database
self.db_conn.execute("""
UPDATE sessions SET total_runs = ? WHERE session_id = ?
""", (global_cmd_index, self.current_session_id))
self.db_conn.commit()
else:
# I2C command - just log completion (no run counting)
self.raw_data_received.emit(
f"[{phase_name}] [I2C] [INFO]",
f"Complete"
)
# ===============================================================
# 5. Delay between commands (with countdown and queue check)
@ -796,7 +907,7 @@ class Session(QObject):
delay_ms = cmd.get('delay_ms', 3000)
# Only delay if not last command overall
if global_cmd_index < self.total_commands:
if total_commands_executed < self.total_commands:
self._execute_delay(delay_ms)
# Check if pause/stop was queued during delay

@ -468,14 +468,15 @@ class SessionWidget(QWidget):
def _on_command_started(self, command_no: int, command_name: str):
"""Handle command started signal."""
self.executing_label.setText(f"Executing: {command_name}")
total = self.session.total_commands
self.command_label.setText(f"Command: {command_no} / {total}")
self._log_info(f"[{command_no}/{total}] {command_name}")
total = self.session.total_uart_runs
self.command_label.setText(f"Run: {command_no} / {total}")
# Don't log here - already logged via raw_data_received with formatted tags
@pyqtSlot(int, int)
def _on_run_completed(self, run_no: int, packet_count: int):
"""Handle run completed signal."""
self._log_info(f"Run {run_no} complete: {packet_count} packets detected")
# Don't log here - already logged via raw_data_received with formatted tags
pass
@pyqtSlot(int)
def _on_delay_countdown(self, seconds_remaining: int):
@ -531,9 +532,28 @@ class SessionWidget(QWidget):
Handle raw UART data display.
Args:
direction: "TX", "RX", or "ERROR"
direction: "TX", "RX", "ERROR", "INFO" or formatted "[Phase] [Interface] [Action]"
hex_string: Hex bytes (e.g., "EF FE 01 02 03") or error message
"""
# Check if direction is already formatted (starts with '[')
if direction.startswith('['):
# New format: [Phase] [Interface] [Action] message
# Extract action for color coding
if '[TX]' in direction:
color = "green"
elif '[ERROR]' in direction:
color = "red"
elif '[INFO]' in direction:
color = "gray"
else: # RX or other
color = "blue"
# Display full formatted string
self.log_display.append(
f"<span style='color: {color};'>{direction} {hex_string}</span>"
)
else:
# Old format: Simple direction + message
if direction == "TX":
color = "green"
prefix = "→ TX"
@ -550,6 +570,7 @@ class SessionWidget(QWidget):
self.log_display.append(
f"<span style='color: {color};'>[{prefix}] {hex_string}</span>"
)
# Auto-scroll to bottom
self.log_display.verticalScrollBar().setValue(
self.log_display.verticalScrollBar().maximum()

Loading…
Cancel
Save