Working version for thursdays review

main
Kynsight 3 weeks ago
parent e23e2e24b2
commit 2df374f4ef

@ -46,11 +46,7 @@ class ConfigureInterfaceWidget(QWidget):
def _init_ui(self):
"""Initialize UI."""
layout = QVBoxLayout()
title = QLabel("Configure Interface Profiles")
title.setStyleSheet("font-size: 16px; font-weight: bold;")
layout.addWidget(title)
splitter = QSplitter(Qt.Orientation.Horizontal)
splitter.addWidget(self._create_list_panel())
splitter.addWidget(self._create_editor_panel())
@ -122,7 +118,7 @@ class ConfigureInterfaceWidget(QWidget):
uart_group = QGroupBox("UART Command Settings")
form = QFormLayout()
# Port with refresh
port_layout = QHBoxLayout()
self.cmd_port = QComboBox()
@ -186,7 +182,7 @@ class ConfigureInterfaceWidget(QWidget):
uart_group = QGroupBox("UART Logger Settings")
form = QFormLayout()
# Port with refresh
port_layout = QHBoxLayout()
self.log_port = QComboBox()

@ -35,11 +35,7 @@ class ConfigureSessionWidget(QWidget):
def _init_ui(self):
"""Initialize UI."""
layout = QVBoxLayout()
title = QLabel("Configure Session Profiles")
title.setStyleSheet("font-size: 16px; font-weight: bold;")
layout.addWidget(title)
splitter = QSplitter(Qt.Orientation.Horizontal)
splitter.addWidget(self._create_list_panel())
splitter.addWidget(self._create_editor_panel())

@ -0,0 +1,775 @@
#!/usr/bin/env python3
"""
Database Manager Widget - vzug-e-hinge
=======================================
Comprehensive database management interface for inspecting and managing sessions.
Features:
- Session browser with sort/filter/search
- Session details with editable notes
- Rename, delete, and export sessions
- Delete individual runs
- Session statistics
Author: Kynsight
Version: 1.0.0
"""
from PyQt6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QSplitter,
QTableWidget, QTableWidgetItem, QHeaderView,
QLabel, QPushButton, QLineEdit, QTextEdit,
QGroupBox, QMessageBox, QInputDialog, QFileDialog,
QCheckBox, QListWidget, QListWidgetItem
)
from PyQt6.QtCore import Qt
from PyQt6.QtGui import QFont
from datetime import datetime
import sqlite3
class DatabaseManagerWidget(QWidget):
"""
Database Manager Widget.
Provides comprehensive session management:
- Browse all sessions
- View session details and runs
- Rename, delete, export sessions
- Add notes to sessions
- Delete individual runs
"""
def __init__(self, db_manager):
"""
Initialize Database Manager Widget.
Args:
db_manager: DatabaseManager instance
"""
super().__init__()
self.db_manager = db_manager
self.selected_session_id = None
self._ensure_notes_column()
self._init_ui()
self._load_sessions()
def _ensure_notes_column(self):
"""Ensure notes column exists in sessions table (migration)."""
try:
conn = self.db_manager.get_connection()
# Check if notes column exists
cursor = conn.execute("PRAGMA table_info(sessions)")
columns = [row[1] for row in cursor.fetchall()]
if 'notes' not in columns:
# Add notes column
conn.execute("ALTER TABLE sessions ADD COLUMN notes TEXT DEFAULT ''")
conn.commit()
print("[INFO] Added 'notes' column to sessions table")
except Exception as e:
print(f"[WARN] Failed to add notes column: {e}")
def _init_ui(self):
"""Initialize user interface."""
layout = QVBoxLayout()
self.setLayout(layout)
# Main splitter (left: session browser, right: details)
splitter = QSplitter(Qt.Orientation.Horizontal)
layout.addWidget(splitter)
# Left panel: Session Browser
left_panel = self._create_session_browser()
splitter.addWidget(left_panel)
# Right panel: Session Details
right_panel = self._create_session_details()
splitter.addWidget(right_panel)
# Set splitter ratio (40% left, 60% right)
splitter.setStretchFactor(0, 4)
splitter.setStretchFactor(1, 6)
def _create_session_browser(self):
"""Create session browser panel (left side)."""
panel = QWidget()
layout = QVBoxLayout()
panel.setLayout(layout)
# Search and filter
search_layout = QHBoxLayout()
search_layout.addWidget(QLabel("Search:"))
self.search_input = QLineEdit()
self.search_input.setPlaceholderText("Filter sessions by name...")
self.search_input.textChanged.connect(self._filter_sessions)
search_layout.addWidget(self.search_input)
layout.addLayout(search_layout)
# Session table
self.session_table = QTableWidget()
self.session_table.setColumnCount(5)
self.session_table.setHorizontalHeaderLabels([
"Select", "Session Name", "Date", "Runs", "Size (MB)"
])
# Configure table
header = self.session_table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Stretch)
header.setSectionResizeMode(2, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(3, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(4, QHeaderView.ResizeMode.ResizeToContents)
self.session_table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
self.session_table.setSelectionMode(QTableWidget.SelectionMode.SingleSelection)
self.session_table.itemSelectionChanged.connect(self._on_session_selected)
self.session_table.setSortingEnabled(True)
layout.addWidget(self.session_table)
# Batch action buttons
btn_layout = QHBoxLayout()
self.btn_select_all = QPushButton("Select All")
self.btn_select_all.clicked.connect(self._select_all_sessions)
btn_layout.addWidget(self.btn_select_all)
self.btn_clear_selection = QPushButton("Clear")
self.btn_clear_selection.clicked.connect(self._clear_selection)
btn_layout.addWidget(self.btn_clear_selection)
btn_layout.addStretch()
self.btn_delete_selected = QPushButton("Delete Selected")
self.btn_delete_selected.clicked.connect(self._delete_selected_sessions)
btn_layout.addWidget(self.btn_delete_selected)
self.btn_refresh = QPushButton("Refresh")
self.btn_refresh.clicked.connect(self._load_sessions)
btn_layout.addWidget(self.btn_refresh)
layout.addLayout(btn_layout)
return panel
def _create_session_details(self):
"""Create session details panel (right side)."""
panel = QWidget()
layout = QVBoxLayout()
panel.setLayout(layout)
# Session info group
info_group = QGroupBox("Session Information")
info_layout = QVBoxLayout()
info_group.setLayout(info_layout)
# Session ID
self.label_session_id = QLabel("No session selected")
self.label_session_id.setFont(QFont("Arial", 10, QFont.Weight.Bold))
info_layout.addWidget(self.label_session_id)
# Created date
self.label_created = QLabel("")
info_layout.addWidget(self.label_created)
# Notes
notes_layout = QHBoxLayout()
notes_layout.addWidget(QLabel("Notes:"))
self.text_notes = QTextEdit()
self.text_notes.setMaximumHeight(80)
self.text_notes.setPlaceholderText("Add notes or description for this session...")
self.text_notes.textChanged.connect(self._on_notes_changed)
notes_layout.addWidget(self.text_notes)
info_layout.addLayout(notes_layout)
# Session actions
action_layout = QHBoxLayout()
self.btn_rename = QPushButton("Rename Session")
self.btn_rename.clicked.connect(self._rename_session)
action_layout.addWidget(self.btn_rename)
self.btn_delete = QPushButton("Delete Session")
self.btn_delete.clicked.connect(self._delete_session)
action_layout.addWidget(self.btn_delete)
self.btn_export = QPushButton("Export CSV")
self.btn_export.clicked.connect(self._export_session)
action_layout.addWidget(self.btn_export)
action_layout.addStretch()
info_layout.addLayout(action_layout)
layout.addWidget(info_group)
# Runs group
runs_group = QGroupBox("Runs in Session")
runs_layout = QVBoxLayout()
runs_group.setLayout(runs_layout)
self.runs_list = QListWidget()
runs_layout.addWidget(self.runs_list)
# Run actions
run_action_layout = QHBoxLayout()
self.btn_delete_runs = QPushButton("Delete Selected Runs")
self.btn_delete_runs.clicked.connect(self._delete_selected_runs)
run_action_layout.addWidget(self.btn_delete_runs)
run_action_layout.addStretch()
runs_layout.addLayout(run_action_layout)
layout.addWidget(runs_group)
# Statistics group
stats_group = QGroupBox("Session Statistics")
stats_layout = QVBoxLayout()
stats_group.setLayout(stats_layout)
self.label_stats = QLabel("Select a session to view statistics")
self.label_stats.setFont(QFont("Monospace", 9))
stats_layout.addWidget(self.label_stats)
layout.addWidget(stats_group)
# Disable all controls initially
self._set_details_enabled(False)
return panel
def _set_details_enabled(self, enabled: bool):
"""Enable or disable detail panel controls."""
self.text_notes.setEnabled(enabled)
self.btn_rename.setEnabled(enabled)
self.btn_delete.setEnabled(enabled)
self.btn_export.setEnabled(enabled)
self.btn_delete_runs.setEnabled(enabled)
def _load_sessions(self):
"""Load all sessions from database."""
try:
conn = self.db_manager.get_connection()
cursor = conn.execute("""
SELECT
session_id,
session_name,
created_at,
total_runs,
notes,
(SELECT COUNT(*) FROM telemetry_decoded WHERE telemetry_decoded.session_id = sessions.session_id) as data_points
FROM sessions
ORDER BY created_at DESC
""")
sessions = cursor.fetchall()
# Store all sessions for filtering
self.all_sessions = sessions
# Display sessions
self._display_sessions(sessions)
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to load sessions:\n{str(e)}")
def _display_sessions(self, sessions):
"""Display sessions in table."""
self.session_table.setSortingEnabled(False)
self.session_table.setRowCount(0)
for row_idx, session in enumerate(sessions):
session_id, session_name, created_at, total_runs, notes, data_points = session
self.session_table.insertRow(row_idx)
# Checkbox
checkbox = QCheckBox()
checkbox_widget = QWidget()
checkbox_layout = QHBoxLayout(checkbox_widget)
checkbox_layout.addWidget(checkbox)
checkbox_layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
checkbox_layout.setContentsMargins(0, 0, 0, 0)
self.session_table.setCellWidget(row_idx, 0, checkbox_widget)
# Session name
name_item = QTableWidgetItem(session_name)
name_item.setData(Qt.ItemDataRole.UserRole, session_id)
self.session_table.setItem(row_idx, 1, name_item)
# Date
date_item = QTableWidgetItem(created_at)
self.session_table.setItem(row_idx, 2, date_item)
# Runs
runs_item = QTableWidgetItem(str(total_runs))
runs_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
self.session_table.setItem(row_idx, 3, runs_item)
# Size (estimate: ~100 bytes per data point)
size_mb = (data_points * 100) / (1024 * 1024)
size_item = QTableWidgetItem(f"{size_mb:.2f}")
size_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
self.session_table.setItem(row_idx, 4, size_item)
self.session_table.setSortingEnabled(True)
def _filter_sessions(self):
"""Filter sessions based on search text."""
search_text = self.search_input.text().lower()
if not search_text:
# Show all sessions
self._display_sessions(self.all_sessions)
else:
# Filter sessions
filtered = [s for s in self.all_sessions if search_text in s[1].lower()]
self._display_sessions(filtered)
def _on_session_selected(self):
"""Handle session selection."""
selected_items = self.session_table.selectedItems()
if not selected_items:
self._set_details_enabled(False)
self.selected_session_id = None
self.label_session_id.setText("No session selected")
self.label_created.setText("")
self.text_notes.clear()
self.runs_list.clear()
self.label_stats.setText("Select a session to view statistics")
return
# Get session ID from selected row
row = selected_items[0].row()
session_id = self.session_table.item(row, 1).data(Qt.ItemDataRole.UserRole)
self.selected_session_id = session_id
self._load_session_details(session_id)
self._set_details_enabled(True)
def _load_session_details(self, session_id: str):
"""Load details for selected session."""
try:
conn = self.db_manager.get_connection()
# Get session info
cursor = conn.execute("""
SELECT session_id, session_name, created_at, total_runs, notes
FROM sessions
WHERE session_id = ?
""", (session_id,))
session = cursor.fetchone()
if not session:
return
session_id, session_name, created_at, total_runs, notes = session
# Update UI
self.label_session_id.setText(f"Session: {session_name}")
self.label_created.setText(f"Created: {created_at}")
# Block signals while setting notes to avoid triggering save
self.text_notes.blockSignals(True)
self.text_notes.setPlainText(notes or "")
self.text_notes.blockSignals(False)
# Load runs
self._load_runs(session_id)
# Load statistics
self._load_statistics(session_id)
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to load session details:\n{str(e)}")
def _load_runs(self, session_id: str):
"""Load runs for selected session."""
try:
conn = self.db_manager.get_connection()
cursor = conn.execute("""
SELECT DISTINCT
t.run_no,
COALESCE(u.command_name, i.command_name, 'Unknown') as command_name,
COUNT(*) as sample_count
FROM telemetry_decoded t
LEFT JOIN uart_commands u ON t.run_command_id = u.command_id
LEFT JOIN i2c_commands i ON t.run_command_id = i.command_id
WHERE t.session_id = ?
GROUP BY t.run_no
ORDER BY t.run_no
""", (session_id,))
runs = cursor.fetchall()
self.runs_list.clear()
for run_no, command_name, sample_count in runs:
item = QListWidgetItem(f"Run {run_no}: {command_name} ({sample_count:,} samples)")
item.setData(Qt.ItemDataRole.UserRole, run_no)
item.setCheckState(Qt.CheckState.Unchecked)
self.runs_list.addItem(item)
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to load runs:\n{str(e)}")
def _load_statistics(self, session_id: str):
"""Load statistics for selected session."""
try:
conn = self.db_manager.get_connection()
# Get statistics
cursor = conn.execute("""
SELECT
COUNT(DISTINCT run_no) as total_runs,
COUNT(*) as total_samples,
MIN(t_ns) as start_time,
MAX(t_ns) as end_time
FROM telemetry_decoded
WHERE session_id = ?
""", (session_id,))
stats = cursor.fetchone()
if stats and stats[0] > 0:
total_runs, total_samples, start_time, end_time = stats
# Calculate duration
duration_ns = end_time - start_time
duration_s = duration_ns / 1_000_000_000
# Format statistics
stats_text = f"• Total Runs: {total_runs}\n"
stats_text += f"• Total Samples: {total_samples:,}\n"
stats_text += f"• Avg Samples/Run: {total_samples // total_runs:,}\n"
stats_text += f"• Duration: {duration_s:.1f} seconds\n"
self.label_stats.setText(stats_text)
else:
self.label_stats.setText("No data available for this session")
except Exception as e:
self.label_stats.setText(f"Error loading statistics: {str(e)}")
def _on_notes_changed(self):
"""Handle notes text change - auto-save."""
if not self.selected_session_id:
return
notes = self.text_notes.toPlainText()
try:
conn = self.db_manager.get_connection()
conn.execute("""
UPDATE sessions
SET notes = ?
WHERE session_id = ?
""", (notes, self.selected_session_id))
conn.commit()
except Exception as e:
print(f"Failed to save notes: {e}")
def _rename_session(self):
"""Rename selected session."""
if not self.selected_session_id:
return
# Get current name
current_name = self.label_session_id.text().replace("Session: ", "")
# Ask for new name
new_name, ok = QInputDialog.getText(
self,
"Rename Session",
"Enter new session name:",
text=current_name
)
if not ok or not new_name.strip():
return
try:
conn = self.db_manager.get_connection()
# Update session name in sessions table
conn.execute("""
UPDATE sessions
SET session_name = ?
WHERE session_id = ?
""", (new_name.strip(), self.selected_session_id))
# Update session name in telemetry_raw table
conn.execute("""
UPDATE telemetry_raw
SET session_name = ?
WHERE session_id = ?
""", (new_name.strip(), self.selected_session_id))
# Update session name in telemetry_decoded table
conn.execute("""
UPDATE telemetry_decoded
SET session_name = ?
WHERE session_id = ?
""", (new_name.strip(), self.selected_session_id))
conn.commit()
# Reload sessions
self._load_sessions()
# Reload details
self._load_session_details(self.selected_session_id)
QMessageBox.information(self, "Success", "Session renamed successfully!")
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to rename session:\n{str(e)}")
def _delete_session(self):
"""Delete selected session."""
if not self.selected_session_id:
return
# Confirm
reply = QMessageBox.question(
self,
"Confirm Delete",
f"Delete session '{self.selected_session_id}'?\n\n"
"This will delete all runs and data for this session.\n"
"This action cannot be undone!",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if reply != QMessageBox.StandardButton.Yes:
return
try:
conn = self.db_manager.get_connection()
conn.execute("DELETE FROM telemetry_decoded WHERE session_id = ?", (self.selected_session_id,))
conn.execute("DELETE FROM telemetry_raw WHERE session_id = ?", (self.selected_session_id,))
conn.execute("DELETE FROM sessions WHERE session_id = ?", (self.selected_session_id,))
conn.commit()
self.selected_session_id = None
self._load_sessions()
self._set_details_enabled(False)
QMessageBox.information(self, "Success", "Session deleted successfully!")
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to delete session:\n{str(e)}")
def _export_session(self):
"""Export selected session to CSV."""
if not self.selected_session_id:
return
# Get save location
filename = f"{self.selected_session_id}_export.csv"
filepath, _ = QFileDialog.getSaveFileName(
self,
"Export Session",
filename,
"CSV Files (*.csv);;All Files (*)"
)
if not filepath:
return
try:
import csv
conn = self.db_manager.get_connection()
# Get all data for session
cursor = conn.execute("""
SELECT
session_id, session_name, run_no, t_ns, time_ms,
motor_current, encoder_value, relative_encoder_value,
v24_pec_diff, pwm,
i2c_raw14, i2c_zero_raw14, i2c_delta_raw14,
i2c_angle_deg, i2c_zero_angle_deg,
angular_velocity, angular_acceleration
FROM telemetry_decoded
WHERE session_id = ?
ORDER BY run_no, t_ns
""", (self.selected_session_id,))
rows = cursor.fetchall()
# Write to CSV
with open(filepath, 'w', newline='') as f:
writer = csv.writer(f)
# Header
writer.writerow([
'session_id', 'session_name', 'run_no', 't_ns', 'time_ms',
'motor_current', 'encoder_value', 'relative_encoder_value',
'v24_pec_diff', 'pwm',
'i2c_raw14', 'i2c_zero_raw14', 'i2c_delta_raw14',
'i2c_angle_deg', 'i2c_zero_angle_deg',
'angular_velocity', 'angular_acceleration'
])
# Data
writer.writerows(rows)
QMessageBox.information(
self,
"Export Complete",
f"Session exported successfully!\n\n"
f"File: {filepath}\n"
f"Rows: {len(rows):,}"
)
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to export session:\n{str(e)}")
def _delete_selected_runs(self):
"""Delete selected runs from session."""
if not self.selected_session_id:
return
# Get checked runs
selected_runs = []
for i in range(self.runs_list.count()):
item = self.runs_list.item(i)
if item.checkState() == Qt.CheckState.Checked:
run_no = item.data(Qt.ItemDataRole.UserRole)
selected_runs.append(run_no)
if not selected_runs:
QMessageBox.warning(self, "No Selection", "Please select runs to delete.")
return
# Confirm
reply = QMessageBox.question(
self,
"Confirm Delete",
f"Delete {len(selected_runs)} run(s) from session?\n\n"
"This will permanently delete all data for these runs.\n"
"This action cannot be undone!",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if reply != QMessageBox.StandardButton.Yes:
return
try:
conn = self.db_manager.get_connection()
for run_no in selected_runs:
conn.execute("""
DELETE FROM telemetry_decoded
WHERE session_id = ? AND run_no = ?
""", (self.selected_session_id, run_no))
conn.execute("""
DELETE FROM telemetry_raw
WHERE session_id = ? AND run_no = ?
""", (self.selected_session_id, run_no))
# Update total_runs in sessions table
cursor = conn.execute("""
SELECT COUNT(DISTINCT run_no)
FROM telemetry_decoded
WHERE session_id = ?
""", (self.selected_session_id,))
new_run_count = cursor.fetchone()[0]
conn.execute("""
UPDATE sessions
SET total_runs = ?
WHERE session_id = ?
""", (new_run_count, self.selected_session_id))
conn.commit()
# Reload details
self._load_session_details(self.selected_session_id)
self._load_sessions()
QMessageBox.information(self, "Success", f"Deleted {len(selected_runs)} run(s) successfully!")
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to delete runs:\n{str(e)}")
def _select_all_sessions(self):
"""Select all checkboxes in session table."""
for row in range(self.session_table.rowCount()):
checkbox_widget = self.session_table.cellWidget(row, 0)
checkbox = checkbox_widget.findChild(QCheckBox)
if checkbox:
checkbox.setChecked(True)
def _clear_selection(self):
"""Clear all checkboxes in session table."""
for row in range(self.session_table.rowCount()):
checkbox_widget = self.session_table.cellWidget(row, 0)
checkbox = checkbox_widget.findChild(QCheckBox)
if checkbox:
checkbox.setChecked(False)
def _delete_selected_sessions(self):
"""Delete all checked sessions."""
# Get checked sessions
selected_sessions = []
for row in range(self.session_table.rowCount()):
checkbox_widget = self.session_table.cellWidget(row, 0)
checkbox = checkbox_widget.findChild(QCheckBox)
if checkbox and checkbox.isChecked():
session_id = self.session_table.item(row, 1).data(Qt.ItemDataRole.UserRole)
session_name = self.session_table.item(row, 1).text()
selected_sessions.append((session_id, session_name))
if not selected_sessions:
QMessageBox.warning(self, "No Selection", "Please select sessions to delete.")
return
# Confirm
session_list = "\n".join([f"{name}" for _, name in selected_sessions])
reply = QMessageBox.question(
self,
"Confirm Delete",
f"Delete {len(selected_sessions)} session(s)?\n\n{session_list}\n\n"
"This will permanently delete all data for these sessions.\n"
"This action cannot be undone!",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if reply != QMessageBox.StandardButton.Yes:
return
try:
conn = self.db_manager.get_connection()
for session_id, _ in selected_sessions:
conn.execute("DELETE FROM telemetry_decoded WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM telemetry_raw WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
conn.commit()
# Vacuum database
self.db_manager.vacuum()
self.selected_session_id = None
self._load_sessions()
self._set_details_enabled(False)
QMessageBox.information(
self,
"Success",
f"Deleted {len(selected_sessions)} session(s) successfully!\n\n"
"Database vacuumed."
)
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to delete sessions:\n{str(e)}")

Binary file not shown.

@ -117,10 +117,10 @@ CREATE TABLE IF NOT EXISTS "session_profiles" (
"profile_id" INTEGER PRIMARY KEY AUTOINCREMENT,
"profile_name" TEXT UNIQUE NOT NULL,
"description" TEXT,
"print_command_rx BOOLEAN DEFAULT 0;
"command_sequence" TEXT,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"last_modified" TIMESTAMP
"last_modified" TIMESTAMP,
"print_command_rx" INTEGER DEFAULT 0
);
-- =============================================================================
@ -132,16 +132,17 @@ CREATE TABLE IF NOT EXISTS "sessions" (
"session_name" TEXT NOT NULL,
"session_date" TEXT NOT NULL,
"description" TEXT,
"notes" TEXT DEFAULT '',
"interface_profile_id" INTEGER,
"session_profile_id" INTEGER,
"status" TEXT DEFAULT 'active',
"total_runs" INTEGER DEFAULT 0,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"ended_at" TIMESTAMP,
FOREIGN KEY ("interface_profile_id") REFERENCES "interface_profiles"("profile_id"),
FOREIGN KEY ("session_profile_id") REFERENCES "session_profiles"("profile_id")
);
@ -592,7 +593,23 @@ class DatabaseManager:
except Exception as e:
print(f"Error checking database size: {e}")
return (0, 0, 'error')
def format_size(self, size_bytes: int) -> str:
"""
Format byte size to human-readable string.
Args:
size_bytes: Size in bytes
Returns:
Formatted string (e.g., "1.5 MB", "2.3 GB")
"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} TB"
def vacuum(self):
"""
Vacuum database (reclaim unused space).

@ -176,7 +176,7 @@ def plot_subplots(
if x_data is None or y_data is None:
ax.text(0.5, 0.5, 'No data', ha='center', va='center')
ax.set_title(f"{data.session_id} - Run {data.run_no} ({data.session_name})")
ax.set_title(f"{data.session_name} ({data.run_no}) [{data.command_name}]")
continue
ax.plot(x_data, y_data, alpha=0.8,
@ -449,6 +449,201 @@ def plot_xy_scatter(
return fig
def plot_envelope(
data_list: List[TelemetryData],
x_column: str,
y_column: str,
xlabel: str,
ylabel: str,
config: Optional[PlotConfig] = None
) -> Figure:
"""
Create envelope plot - shows min/max bounds across all runs.
Useful for visualizing acceptable variation ranges.
Args:
data_list: List of TelemetryData objects
x_column: Column name for X-axis
y_column: Column name for Y-axis
xlabel: X-axis label
ylabel: Y-axis label
config: Plot configuration
Returns:
Matplotlib Figure object
"""
if config is None:
config = PlotConfig()
fig, ax = plt.subplots(figsize=config.figsize, dpi=config.dpi)
if not data_list:
ax.text(0.5, 0.5, 'No data', ha='center', va='center')
return fig
# Find common x-axis (use first run as reference)
reference = data_list[0]
ref_x = getattr(reference, x_column, None)
if ref_x is None:
ax.text(0.5, 0.5, 'No reference data', ha='center', va='center')
return fig
# Collect all Y values at each X point
all_y_values = []
for data in data_list:
x_data = getattr(data, x_column, None)
y_data = getattr(data, y_column, None)
if x_data is None or y_data is None:
continue
# Interpolate to match reference x points
if len(x_data) != len(ref_x) or not np.array_equal(x_data, ref_x):
y_interp = np.interp(ref_x, x_data, y_data)
else:
y_interp = y_data
all_y_values.append(y_interp)
if not all_y_values:
ax.text(0.5, 0.5, 'No data to plot', ha='center', va='center')
return fig
# Calculate statistics
all_y_values = np.array(all_y_values)
y_min = np.min(all_y_values, axis=0)
y_max = np.max(all_y_values, axis=0)
y_mean = np.mean(all_y_values, axis=0)
# Plot envelope
ax.fill_between(ref_x, y_min, y_max, alpha=0.3, color='lightblue',
label='Min/Max Range')
ax.plot(ref_x, y_mean, '-', color='blue', linewidth=2,
label='Mean', alpha=0.8)
ax.plot(ref_x, y_min, '--', color='gray', linewidth=1,
label='Min', alpha=0.6)
ax.plot(ref_x, y_max, '--', color='gray', linewidth=1,
label='Max', alpha=0.6)
# Formatting
ax.set_xlabel(xlabel, fontsize=config.label_fontsize)
ax.set_ylabel(ylabel, fontsize=config.label_fontsize)
ax.set_title(f"{config.title} - Envelope Analysis", fontsize=config.title_fontsize)
ax.tick_params(axis='both', labelsize=config.tick_fontsize)
if config.grid:
ax.grid(True, alpha=0.3)
if config.legend:
ax.legend(loc='best', fontsize=config.legend_fontsize)
fig.tight_layout()
return fig
def plot_cumulative_deviation(
data_list: List[TelemetryData],
x_column: str,
y_column: str,
xlabel: str,
ylabel: str,
reference_index: int = 0,
config: Optional[PlotConfig] = None
) -> Figure:
"""
Create cumulative deviation plot - shows accumulated error over time.
Useful for detecting slow, compounding drift.
Args:
data_list: List of TelemetryData objects
x_column: Column name for X-axis
y_column: Column name for Y-axis
xlabel: X-axis label
ylabel: Y-axis label
reference_index: Index of reference run (default: 0)
config: Plot configuration
Returns:
Matplotlib Figure object
"""
if config is None:
config = PlotConfig()
if reference_index >= len(data_list):
reference_index = 0
fig, ax = plt.subplots(figsize=config.figsize, dpi=config.dpi)
reference = data_list[reference_index]
ref_x = getattr(reference, x_column, None)
ref_y = getattr(reference, y_column, None)
if ref_x is None or ref_y is None:
ax.text(0.5, 0.5, 'No reference data', ha='center', va='center')
return fig
# Plot reference as zero line
ax.axhline(y=0, color='black', linestyle='--',
label=f'Reference: {reference.session_name} ({reference.run_no}) [{reference.command_name}]')
# Set color cycle
ax.set_prop_cycle(color=plt.cm.tab10.colors)
# Plot cumulative deviations
for idx, data in enumerate(data_list):
if idx == reference_index:
continue
x_data = getattr(data, x_column, None)
y_data = getattr(data, y_column, None)
if x_data is None or y_data is None:
continue
# Interpolate to match reference x points
if len(x_data) != len(ref_x) or not np.array_equal(x_data, ref_x):
y_interp = np.interp(ref_x, x_data, y_data)
else:
y_interp = y_data
# Calculate deviation
deviation = y_interp - ref_y
# Calculate cumulative sum
cumulative_deviation = np.cumsum(deviation)
# Label format
label = f"{data.session_name} ({data.run_no}) [{data.command_name}]"
ax.plot(ref_x, cumulative_deviation, label=label, alpha=0.8,
linestyle=config.linestyle, marker=config.marker,
markersize=config.markersize)
# Formatting
ax.set_xlabel(xlabel, fontsize=config.label_fontsize)
ax.set_ylabel(f"Cumulative Deviation in {ylabel}", fontsize=config.label_fontsize)
ax.set_title(f"{config.title} - Cumulative Drift Analysis", fontsize=config.title_fontsize)
ax.tick_params(axis='both', labelsize=config.tick_fontsize)
if config.grid:
ax.grid(True, alpha=0.3)
if config.legend:
handles, labels = ax.get_legend_handles_labels()
if handles and labels:
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.12), ncol=2, fontsize=config.legend_fontsize)
fig.tight_layout(rect=[0, 0.08, 1, 1])
else:
fig.tight_layout()
else:
fig.tight_layout()
return fig
# =============================================================================
# Export Functions
# =============================================================================
@ -498,26 +693,27 @@ def export_csv(
writer = csv.writer(f)
# Header: metadata + X column + selected Y columns
header = ['session_id', 'session_name', 'run_no', x_column] + y_columns
header = ['session_id', 'session_name', 'run_no', 'command_name', x_column] + y_columns
writer.writerow(header)
# Data rows
for data in data_list:
# Get X data
x_data = getattr(data, x_column, None)
if x_data is None:
continue
# Get length from X column
length = len(x_data)
# Write each data point
for i in range(length):
row = [
data.session_id,
data.session_name,
data.run_no,
data.command_name,
x_data[i]
]

@ -7,7 +7,7 @@ GUI for visualizing telemetry data from SQLite database.
Features:
- Session/run tree selector
- Data series checkboxes
- Plot type selector (overlay, subplots, drift)
- Plot type selector (overlay, drift, multi-series)
- Embedded matplotlib canvas
- Zoom/pan controls
- Export PNG/CSV
@ -22,11 +22,12 @@ from PyQt6.QtWidgets import (
QLabel, QPushButton, QComboBox,
QGroupBox, QTreeWidget, QTreeWidgetItem,
QCheckBox, QFileDialog, QMessageBox, QSplitter,
QSizePolicy
QSizePolicy, QApplication
)
from PyQt6.QtCore import Qt, pyqtSignal
from PyQt6.QtGui import QFont
from typing import Optional
from datetime import datetime
try:
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
@ -55,10 +56,11 @@ from graph_table_query import (
from .graph_core import (
PlotConfig,
plot_overlay,
plot_subplots,
plot_comparison,
plot_multi_series,
plot_xy_scatter,
plot_envelope,
plot_cumulative_deviation,
export_png,
export_csv
)
@ -152,13 +154,8 @@ class GraphWidget(QWidget):
# 2. Plot controls (right after database!)
plot_controls = self._create_plot_controls()
layout.addWidget(plot_controls)
# 3. Refresh button
btn_refresh = QPushButton("🔄 Refresh Sessions")
btn_refresh.clicked.connect(self._load_sessions)
layout.addWidget(btn_refresh)
# 4. Data series selection (MOVED UP - before Sessions!)
# 3. Data series selection (MOVED UP - before Sessions!)
self.series_group = self._create_series_selector()
layout.addWidget(self.series_group)
@ -168,30 +165,54 @@ class GraphWidget(QWidget):
tree_group.setLayout(tree_layout)
self.tree_sessions = QTreeWidget()
self.tree_sessions.setHeaderLabels(["Session / Run", "Samples"])
self.tree_sessions.setHeaderLabels(["Session / Run", "Date"])
self.tree_sessions.itemChanged.connect(self._on_selection_changed)
# Set column widths - give more space to Session/Run column
self.tree_sessions.setColumnWidth(0, 200) # Session/Run column
self.tree_sessions.setColumnWidth(1, 120) # Date column (wider for dd/mm/yy HH:mm)
# Left-align date column header (default)
self.tree_sessions.headerItem().setTextAlignment(1, Qt.AlignmentFlag.AlignLeft)
tree_layout.addWidget(self.tree_sessions)
# Refresh button under tree
btn_refresh_sessions = QPushButton("🔄 Refresh Sessions")
btn_refresh_sessions.clicked.connect(self._load_sessions)
tree_layout.addWidget(btn_refresh_sessions)
# Reference run selector
ref_layout = QHBoxLayout()
self.ref_run_label = QLabel("Reference Run:")
ref_layout.addWidget(self.ref_run_label)
self.combo_reference_run = QComboBox()
self.combo_reference_run.setToolTip("Select which run to use as reference/baseline for drift analysis")
ref_layout.addWidget(self.combo_reference_run)
tree_layout.addLayout(ref_layout)
# Start disabled (default plot type is Overlay, which doesn't need reference)
self.ref_run_label.setEnabled(False)
self.combo_reference_run.setEnabled(False)
# Buttons under tree
btn_row = QHBoxLayout()
btn_generate = QPushButton("📊 Generate Plot")
btn_generate.clicked.connect(self._generate_plot)
btn_row.addWidget(btn_generate)
btn_export_png = QPushButton("💾 PNG")
btn_export_png.clicked.connect(self._export_png)
btn_row.addWidget(btn_export_png)
btn_export_csv = QPushButton("📄 CSV")
btn_export_csv.clicked.connect(self._export_csv)
btn_row.addWidget(btn_export_csv)
tree_layout.addLayout(btn_row)
layout.addWidget(tree_group)
layout.addStretch()
layout.addWidget(tree_group, 1) # Stretch factor 1: expand to fill available space
return panel
def _create_series_selector(self):
@ -201,52 +222,43 @@ class GraphWidget(QWidget):
group.setLayout(layout)
# Simple vertical list - no grouping
self.check_t_ns = QCheckBox("t_ns")
self.check_t_ns = QCheckBox("Time (ns)")
layout.addWidget(self.check_t_ns)
self.check_time_ms = QCheckBox("time_ms")
self.check_time_ms = QCheckBox("Time (ms)")
layout.addWidget(self.check_time_ms)
self.check_motor_current = QCheckBox("motor_current")
self.check_motor_current = QCheckBox("Motor Current (A)")
self.check_motor_current.setChecked(True) # Default
layout.addWidget(self.check_motor_current)
self.check_encoder_value = QCheckBox("encoder_value")
self.check_encoder_value = QCheckBox("Encoder Value")
self.check_encoder_value.setChecked(True) # Default
layout.addWidget(self.check_encoder_value)
self.check_relative_encoder_value = QCheckBox("relative_encoder_value")
self.check_relative_encoder_value = QCheckBox("Relative Encoder")
self.check_relative_encoder_value.setChecked(True) # Default
layout.addWidget(self.check_relative_encoder_value)
self.check_v24_pec_diff = QCheckBox("v24_pec_diff")
self.check_v24_pec_diff = QCheckBox("V24 PEC Diff (V)")
layout.addWidget(self.check_v24_pec_diff)
self.check_pwm = QCheckBox("pwm")
self.check_pwm = QCheckBox("PWM (%)")
self.check_pwm.setChecked(True) # Default
layout.addWidget(self.check_pwm)
self.check_i2c_raw14 = QCheckBox("i2c_raw14")
layout.addWidget(self.check_i2c_raw14)
self.check_i2c_zero_raw14 = QCheckBox("i2c_zero_raw14")
layout.addWidget(self.check_i2c_zero_raw14)
self.check_i2c_delta_raw14 = QCheckBox("i2c_delta_raw14")
layout.addWidget(self.check_i2c_delta_raw14)
self.check_i2c_angle_deg = QCheckBox("i2c_angle_deg")
self.check_i2c_angle_deg = QCheckBox("Angle (°)")
self.check_i2c_angle_deg.setChecked(True) # Default
layout.addWidget(self.check_i2c_angle_deg)
self.check_i2c_zero_angle_deg = QCheckBox("i2c_zero_angle_deg")
self.check_i2c_zero_angle_deg = QCheckBox("Zero Angle (°)")
layout.addWidget(self.check_i2c_zero_angle_deg)
self.check_angular_velocity = QCheckBox("angular_velocity")
self.check_angular_velocity = QCheckBox("Angular Velocity (°/s)")
self.check_angular_velocity.setChecked(True) # Default
layout.addWidget(self.check_angular_velocity)
self.check_angular_acceleration = QCheckBox("angular_acceleration")
self.check_angular_acceleration = QCheckBox("Angular Acceleration (°/s²)")
layout.addWidget(self.check_angular_acceleration)
return group
@ -271,6 +283,13 @@ class GraphWidget(QWidget):
placeholder_label.setStyleSheet("color: #888888; font-size: 14pt;")
layout.addWidget(placeholder_label, 1) # stretch=1 to fill space
# Coordinate display label (below canvas) - Qt widget for better performance
self.coord_label = QLabel("")
self.coord_label.setFont(QFont("Monospace", 9))
self.coord_label.setStyleSheet("padding: 3px; background-color: #f0f0f0; border-top: 1px solid #ccc;")
self.coord_label.setMinimumHeight(25)
layout.addWidget(self.coord_label)
return panel
def _create_plot_controls(self):
@ -295,16 +314,11 @@ class GraphWidget(QWidget):
self.combo_line_type.addItems([
"Line",
"Line + Markers",
"Markers Only",
"Steps"
"Markers Only"
])
line_type_row.addWidget(self.combo_line_type)
layout.addLayout(line_type_row)
# Separate window checkbox
self.check_separate_window = QCheckBox("Open in separate window")
layout.addWidget(self.check_separate_window)
# X-axis selector (for time series mode)
self.xaxis_row = QHBoxLayout()
self.xaxis_label = QLabel("X-Axis:")
@ -321,13 +335,19 @@ class GraphWidget(QWidget):
self.combo_plot_type = QComboBox()
self.combo_plot_type.addItems([
"Overlay",
"Subplots",
"Drift Comparison",
"Multi-Series"
"Multi-Series",
"Envelope Plot",
"Cumulative Deviation"
])
self.combo_plot_type.currentTextChanged.connect(self._on_plot_type_changed)
self.type_row.addWidget(self.combo_plot_type)
layout.addLayout(self.type_row)
# Separate window checkbox
self.check_separate_window = QCheckBox("Open in separate window")
layout.addWidget(self.check_separate_window)
# XY plot selectors (hidden by default)
self.xy_row = QHBoxLayout()
self.xy_x_label = QLabel("X:")
@ -395,7 +415,14 @@ class GraphWidget(QWidget):
# Show data series panel
self.series_group.setVisible(True)
def _on_plot_type_changed(self, plot_type: str):
"""Handle plot type change - enable/disable reference run selector."""
# Enable reference selector only for modes that need it
needs_reference = plot_type in ["Drift Comparison", "Cumulative Deviation"]
self.ref_run_label.setEnabled(needs_reference)
self.combo_reference_run.setEnabled(needs_reference)
# =========================================================================
# Database Connection
# =========================================================================
@ -464,10 +491,14 @@ class GraphWidget(QWidget):
return
for session in self.sessions:
# Create session item with date
# Create session item - date in second column
session_item = QTreeWidgetItem(self.tree_sessions)
session_item.setText(0, f"{session.session_id} ({session.created_at})")
session_item.setText(1, f"{session.run_count} runs")
session_item.setText(0, session.session_name)
# Format date as dd/mm/yy HH:mm
formatted_date = self._format_date(session.created_at)
session_item.setText(1, formatted_date)
session_item.setFlags(session_item.flags() | Qt.ItemFlag.ItemIsUserCheckable)
session_item.setCheckState(0, Qt.CheckState.Unchecked)
session_item.setData(0, Qt.ItemDataRole.UserRole, session.session_id)
@ -478,17 +509,43 @@ class GraphWidget(QWidget):
for run in runs:
run_item = QTreeWidgetItem(session_item)
run_item.setText(0, f"Run {run.run_number}: {run.command_name}")
run_item.setText(1, f"{run.sample_count} samples")
# No sample information in column 1 for runs
run_item.setFlags(run_item.flags() | Qt.ItemFlag.ItemIsUserCheckable)
run_item.setCheckState(0, Qt.CheckState.Unchecked)
run_item.setData(0, Qt.ItemDataRole.UserRole, (session.session_id, run.run_number))
self.tree_sessions.expandAll()
self.tree_sessions.collapseAll()
except Exception as e:
print(f"[ERROR] Failed to load sessions: {e}")
import traceback
traceback.print_exc()
def _format_date(self, date_string: str) -> str:
"""
Format date string to dd/mm/yy HH:mm.
Args:
date_string: Date in format "YYYY-MM-DD HH:MM:SS" or similar
Returns:
Formatted date string "dd/mm/yy HH:mm" or original if parsing fails
"""
if not date_string:
return ""
try:
# Try parsing common SQLite datetime format
dt = datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
return dt.strftime("%d/%m/%y %H:%M")
except ValueError:
try:
# Try ISO format with microseconds
dt = datetime.strptime(date_string.split('.')[0], "%Y-%m-%d %H:%M:%S")
return dt.strftime("%d/%m/%y %H:%M")
except ValueError:
# Return original if can't parse
return date_string
def _on_selection_changed(self, item: QTreeWidgetItem, column: int):
"""Handle checkbox changes in tree."""
# Update parent/child checkboxes
@ -506,7 +563,57 @@ class GraphWidget(QWidget):
for i in range(parent.childCount())
)
parent.setCheckState(0, Qt.CheckState.Checked if all_checked else Qt.CheckState.Unchecked)
# Update reference run selector
self._update_reference_selector()
def _update_reference_selector(self):
"""Update reference run dropdown based on selected runs."""
# Get currently selected runs
selected_runs = self._get_selected_runs()
# Clear and repopulate reference selector
self.combo_reference_run.clear()
if not selected_runs:
self.combo_reference_run.addItem("(No runs selected)", None)
# Don't change enabled state - let plot type control it
return
# Populate with selected runs
for idx, (session_id, run_no) in enumerate(selected_runs):
# Find session name from tree
session_name = self._get_session_name(session_id)
# Get command name for the run
command_name = self._get_command_name(session_id, run_no)
# Format: "Session Name - Run #: Command"
label = f"{session_name} - Run {run_no}: {command_name}"
self.combo_reference_run.addItem(label, idx) # Store index as data
# Enable only if current plot type needs reference
plot_type = self.combo_plot_type.currentText()
needs_reference = plot_type in ["Drift Comparison", "Cumulative Deviation"]
self.ref_run_label.setEnabled(needs_reference)
self.combo_reference_run.setEnabled(needs_reference)
def _get_session_name(self, session_id: str) -> str:
"""Get session name for a session_id."""
for session in self.sessions:
if session.session_id == session_id:
return session.session_name
return session_id # Fallback to ID
def _get_command_name(self, session_id: str, run_no: int) -> str:
"""Get command name for a specific run."""
if self.adapter:
runs = self.adapter.get_runs(session_id)
for run in runs:
if run.run_number == run_no:
return run.command_name
return "Unknown"
def _get_selected_runs(self):
"""Get list of selected (session_id, run_number) tuples."""
selected = []
@ -558,56 +665,53 @@ class GraphWidget(QWidget):
series = []
labels = []
# Time axes
if self.check_t_ns.isChecked():
series.append('t_ns')
labels.append('Time (ns)')
if self.check_time_ms.isChecked():
series.append('time_ms')
labels.append('Time (ms)')
# UART data
if self.check_motor_current.isChecked():
series.append('motor_current')
labels.append('Motor Current')
labels.append('Motor Current (A)')
if self.check_encoder_value.isChecked():
series.append('encoder_value')
labels.append('Encoder Value')
if self.check_relative_encoder_value.isChecked():
series.append('relative_encoder_value')
labels.append('Relative Encoder')
if self.check_v24_pec_diff.isChecked():
series.append('v24_pec_diff')
labels.append('V24 PEC Diff')
labels.append('V24 PEC Diff (V)')
if self.check_pwm.isChecked():
series.append('pwm')
labels.append('PWM')
labels.append('PWM (%)')
# I2C data
if self.check_i2c_raw14.isChecked():
series.append('i2c_raw14')
labels.append('I2C Raw (14-bit)')
if self.check_i2c_zero_raw14.isChecked():
series.append('i2c_zero_raw14')
labels.append('I2C Zero Raw')
if self.check_i2c_delta_raw14.isChecked():
series.append('i2c_delta_raw14')
labels.append('I2C Delta Raw')
if self.check_i2c_angle_deg.isChecked():
series.append('i2c_angle_deg')
labels.append('Angle (degrees)')
labels.append('Angle (°)')
if self.check_i2c_zero_angle_deg.isChecked():
series.append('i2c_zero_angle_deg')
labels.append('Zero Angle (degrees)')
labels.append('Zero Angle (°)')
# Derived data
if self.check_angular_velocity.isChecked():
series.append('angular_velocity')
labels.append('Angular Velocity')
labels.append('Angular Velocity (°/s)')
if self.check_angular_acceleration.isChecked():
series.append('angular_acceleration')
labels.append('Angular Acceleration')
labels.append('Angular Acceleration (°/s²)')
return series, labels
@ -631,7 +735,7 @@ class GraphWidget(QWidget):
if not self.loaded_data:
QMessageBox.warning(self, "Load Error", "Failed to load data from database")
return
# Get plot mode
plot_mode = self.combo_plot_mode.currentText()
@ -678,9 +782,9 @@ class GraphWidget(QWidget):
xlabel = self._get_axis_label(x_col)
plot_type = self.combo_plot_type.currentText()
if plot_type == "Overlay":
# FIXED: All series on SAME plot (not stacked)
# All series on SAME plot with multiple Y-axes
self.figure = self._plot_overlay_all(
self.loaded_data,
x_col,
@ -689,30 +793,22 @@ class GraphWidget(QWidget):
labels,
config
)
elif plot_type == "Subplots":
# One subplot per run (first series only)
self.figure = plot_subplots(
self.loaded_data,
x_col,
series[0],
xlabel,
labels[0],
config
)
elif plot_type == "Drift Comparison":
# Drift comparison (first series only)
# Drift comparison (first series only) with selected reference
ref_index = self.combo_reference_run.currentData()
if ref_index is None:
ref_index = 0
self.figure = plot_comparison(
self.loaded_data,
x_col,
series[0],
xlabel,
labels[0],
reference_index=0,
reference_index=ref_index,
config=config
)
elif plot_type == "Multi-Series":
# Multi-series stacked subplots
self.figure = plot_multi_series(
@ -723,7 +819,33 @@ class GraphWidget(QWidget):
labels,
config
)
elif plot_type == "Envelope Plot":
# Envelope plot (first series only)
self.figure = plot_envelope(
self.loaded_data,
x_col,
series[0],
xlabel,
labels[0],
config=config
)
elif plot_type == "Cumulative Deviation":
# Cumulative deviation (first series only) with selected reference
ref_index = self.combo_reference_run.currentData()
if ref_index is None:
ref_index = 0
self.figure = plot_cumulative_deviation(
self.loaded_data,
x_col,
series[0],
xlabel,
labels[0],
reference_index=ref_index,
config=config
)
# Display plot
if self.check_separate_window.isChecked():
# Open in separate window
@ -741,78 +863,109 @@ class GraphWidget(QWidget):
def _plot_overlay_all(self, data_list, x_col, y_columns, xlabel, ylabels, config):
"""
Plot all series from all runs on SAME axes (true overlay).
Fixes the stacking issue - everything goes on one plot.
Plot all series from all runs on SAME plot with multiple Y-axes.
Each series gets its own Y-axis with matching color.
"""
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=config.figsize, dpi=config.dpi)
# Set color cycle for distinguishable colors
ax.set_prop_cycle(color=plt.cm.tab10.colors) # 20 colors
fig, ax1 = plt.subplots(figsize=config.figsize, dpi=config.dpi)
# Get line type settings
line_type = self.combo_line_type.currentText()
linestyle, marker, markersize = self._get_line_style(line_type)
# Track if any data was plotted
# Color palette for series (each series gets a base color)
import matplotlib.colors as mcolors
# Base colors for each series (used for Y-axis labels and lines)
series_base_colors = ['tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:purple', 'tab:brown']
# Create axes for each series
axes = [ax1]
for i in range(1, len(y_columns)):
if i == 1:
# Second series: right Y-axis
axes.append(ax1.twinx())
else:
# Third+ series: offset right Y-axes
ax_new = ax1.twinx()
ax_new.spines['right'].set_position(('outward', 60 * (i - 1)))
axes.append(ax_new)
# Track legend handles/labels
all_handles = []
all_labels = []
plotted_any = False
# Plot each combination of run × series
for data in data_list:
x_data = getattr(data, x_col, None)
if x_data is None:
continue
for y_col, ylabel in zip(y_columns, ylabels):
# Plot each series on its own axis
for series_idx, (y_col, ylabel) in enumerate(zip(y_columns, ylabels)):
ax = axes[series_idx]
base_color = series_base_colors[series_idx % len(series_base_colors)]
# Create color variations for different runs within this series
# Convert base color to RGB and create lighter/darker variations
base_rgb = mcolors.to_rgb(base_color)
for run_idx, data in enumerate(data_list):
x_data = getattr(data, x_col, None)
y_data = getattr(data, y_col, None)
if y_data is None:
if x_data is None or y_data is None:
continue
plotted_any = True
# Label format: "session_name (run_no) [command] - Data"
label = f"{data.session_name} ({data.run_no}) [{data.command_name}] - {ylabel}"
ax.plot(x_data, y_data, label=label, alpha=0.8,
linestyle=linestyle, marker=marker, markersize=markersize)
# Vary alpha/intensity for different runs of same series
# First run uses full color, subsequent runs use lighter shades
if len(data_list) == 1:
line_alpha = 0.9
else:
# Alpha varies from 0.9 to 0.5 across runs
line_alpha = 0.9 - (run_idx * 0.4 / max(1, len(data_list) - 1))
line, = ax.plot(x_data, y_data, label=label, alpha=line_alpha,
linestyle=linestyle, marker=marker, markersize=markersize,
color=base_color)
all_handles.append(line)
all_labels.append(label)
# Set Y-axis label and ticks with matching color
ax.set_ylabel(ylabel, fontsize=config.label_fontsize, color=base_color)
ax.tick_params(axis='y', labelsize=config.tick_fontsize, labelcolor=base_color)
# Grid only on first axis
if series_idx == 0 and config.grid:
ax.grid(True, alpha=0.3)
# Check if any data was plotted
if not plotted_any:
ax.text(0.5, 0.5, 'No data available for selected series\n(columns may not exist in database)',
ha='center', va='center', transform=ax.transAxes,
fontsize=config.label_fontsize, color='red')
# Formatting
ax.set_xlabel(xlabel, fontsize=config.label_fontsize)
ax.set_ylabel("Value", fontsize=config.label_fontsize)
ax.set_title(config.title, fontsize=config.title_fontsize)
ax.tick_params(axis='both', labelsize=config.tick_fontsize)
if config.grid:
ax.grid(True, alpha=0.3)
if config.legend:
# Only add legend if there are labeled artists
handles, labels = ax.get_legend_handles_labels()
if handles and labels:
# Legend at bottom, outside plot, 2 columns
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.12), ncol=2, fontsize=config.legend_fontsize)
fig.tight_layout(rect=[0, 0.08, 1, 1]) # Leave space for legend
else:
# No data plotted - just tight layout without legend space
fig.tight_layout()
ax1.text(0.5, 0.5, 'No data available for selected series\n(columns may not exist in database)',
ha='center', va='center', transform=ax1.transAxes,
fontsize=config.label_fontsize, color='red')
# Formatting for main axis
ax1.set_xlabel(xlabel, fontsize=config.label_fontsize)
ax1.set_title(config.title, fontsize=config.title_fontsize)
ax1.tick_params(axis='x', labelsize=config.tick_fontsize)
if config.legend and all_handles:
# Combined legend at bottom
fig.legend(all_handles, all_labels, loc='upper center',
bbox_to_anchor=(0.5, -0.02), ncol=2, fontsize=config.legend_fontsize)
fig.tight_layout(rect=[0, 0.08, 1, 1])
else:
fig.tight_layout()
return fig
def _get_line_style(self, line_type):
"""
Get matplotlib line style parameters.
Returns:
(linestyle, marker, markersize)
"""
@ -822,8 +975,6 @@ class GraphWidget(QWidget):
return '-', 'o', 3
elif line_type == "Markers Only":
return '', 'o', 4
elif line_type == "Steps":
return 'steps-post', None, 0
else:
return '-', None, 0
@ -881,12 +1032,22 @@ class GraphWidget(QWidget):
self.plot_layout.addWidget(self.canvas, 1) # stretch=1 to take all available space
self.plot_layout.addWidget(self.toolbar) # toolbar stays at natural height
# Recreate coordinate display label (below canvas)
self.coord_label = QLabel("")
self.coord_label.setFont(QFont("Monospace", 9))
self.coord_label.setStyleSheet("padding: 3px; background-color: #f0f0f0; border-top: 1px solid #ccc;")
self.coord_label.setMinimumHeight(25)
self.plot_layout.addWidget(self.coord_label)
# Force canvas to resize to fill available space
self.canvas.updateGeometry()
# Draw the canvas
self.canvas.draw()
# Setup crosshair cursors (optimized for Raspberry Pi/Wayland)
self._setup_crosshair(figure)
except Exception as e:
print(f"[ERROR] Failed to update canvas: {e}")
import traceback
@ -896,44 +1057,143 @@ class GraphWidget(QWidget):
error_label.setStyleSheet("color: red;")
self.plot_layout.addWidget(error_label)
def _setup_crosshair(self, figure):
"""Setup interactive crosshair cursors for the main canvas."""
self._setup_crosshair_for_canvas(figure, self.canvas)
def _show_plot_window(self, figure):
"""Show plot in separate window."""
from PyQt6.QtWidgets import QDialog, QVBoxLayout
dialog = QDialog(self)
dialog.setWindowTitle("Telemetry Plot")
dialog.resize(1000, 700)
layout = QVBoxLayout()
dialog.setLayout(layout)
# Canvas and toolbar
canvas = FigureCanvas(figure)
toolbar = NavigationToolbar(canvas, dialog)
layout.addWidget(canvas)
layout.addWidget(toolbar)
# Setup crosshair for separate window (optimized)
self._setup_crosshair_for_canvas(figure, canvas)
dialog.show()
def _setup_crosshair_for_canvas(self, figure, canvas):
"""Setup crosshair for a specific canvas - OPTIMIZED with Qt label for coordinates."""
try:
import time
axes_list = figure.get_axes()
if not axes_list:
return
crosshair_lines = {}
axes_info = {} # Store Y-axis label for each axis
background = None
last_update_time = [0] # Use list to allow modification in nested function
# Create crosshairs and collect axis info - OPTIMIZED: thinner lines, less alpha
for idx, ax in enumerate(axes_list):
vline = ax.axvline(x=0, color='gray', linestyle='--', linewidth=0.5, visible=False, alpha=0.5)
hline = ax.axhline(y=0, color='gray', linestyle='--', linewidth=0.5, visible=False, alpha=0.5)
crosshair_lines[ax] = (vline, hline)
# Extract Y-axis label
ylabel = ax.get_ylabel()
axes_info[ax] = ylabel
# Draw canvas once and cache the background
canvas.draw()
background = canvas.copy_from_bbox(figure.bbox)
def on_mouse_move(event):
nonlocal background
# OPTIMIZATION: Throttle updates to every 50ms for better performance on Raspberry Pi
current_time = time.time()
if current_time - last_update_time[0] < 0.05: # 50ms throttle
return
last_update_time[0] = current_time
if background is None:
return
if event.inaxes and event.x is not None and event.y is not None:
# Restore background
canvas.restore_region(background)
# Show crosshair on the axis under cursor
vline, hline = crosshair_lines[event.inaxes]
vline.set_xdata([event.xdata])
hline.set_ydata([event.ydata])
vline.set_visible(True)
hline.set_visible(True)
event.inaxes.draw_artist(vline)
event.inaxes.draw_artist(hline)
# Blit the updated region (only crosshair lines, no text!)
canvas.blit(figure.bbox)
# Update Qt label with coordinates - MUCH FASTER than matplotlib text!
coord_text = f'X: {event.xdata:.1f} | '
# Get Y values for all axes
y_values = []
for ax in axes_list:
try:
# Convert display coordinates to data coordinates for this axis
display_coords = (event.x, event.y)
data_coords = ax.transData.inverted().transform(display_coords)
y_val = data_coords[1]
ylabel = axes_info.get(ax, '')
if ylabel:
y_values.append(f'{ylabel}: {y_val:.1f}')
except:
pass
coord_text += ' | '.join(y_values)
self.coord_label.setText(coord_text)
else:
# Mouse left plot area - restore clean background and clear label
canvas.restore_region(background)
canvas.blit(figure.bbox)
self.coord_label.setText("")
# Update background after any resize
def on_draw(event):
nonlocal background
background = canvas.copy_from_bbox(figure.bbox)
canvas.mpl_connect('motion_notify_event', on_mouse_move)
canvas.mpl_connect('draw_event', on_draw)
except Exception as e:
print(f"[WARN] Failed to setup crosshair: {e}")
def _get_axis_label(self, column: str) -> str:
"""Get human-readable label for column."""
labels = {
't_ns': 'Time (ns)',
'time_ms': 'Time (ms)',
'motor_current': 'Motor Current',
'motor_current': 'Motor Current (A)',
'encoder_value': 'Encoder Value',
'relative_encoder_value': 'Relative Encoder',
'v24_pec_diff': 'V24 PEC Diff',
'pwm': 'PWM',
'i2c_raw14': 'I2C Raw (14-bit)',
'i2c_zero_raw14': 'I2C Zero Raw',
'i2c_delta_raw14': 'I2C Delta Raw',
'i2c_angle_deg': 'Angle (degrees)',
'i2c_zero_angle_deg': 'Zero Angle (degrees)',
'angular_velocity': 'Angular Velocity'
'v24_pec_diff': 'V24 PEC Diff (V)',
'pwm': 'PWM (%)',
'i2c_angle_deg': 'Angle (°)',
'i2c_zero_angle_deg': 'Zero Angle (°)',
'angular_velocity': 'Angular Velocity (°/s)',
'angular_acceleration': 'Angular Acceleration (°/s²)'
}
return labels.get(column, column)
return labels.get(column, column.replace('_', ' ').title())
# =========================================================================
# Export

@ -36,6 +36,7 @@ import numpy as np
class SessionInfo:
"""Session metadata."""
session_id: str
session_name: str
created_at: str
description: str
run_count: int
@ -196,8 +197,9 @@ class SQLiteAdapter(DataAdapter):
cursor = self._conn.cursor()
cursor.execute("""
SELECT
SELECT
s.session_id,
s.session_name,
s.created_at,
s.description,
COUNT(DISTINCT t.run_no) as run_count
@ -206,11 +208,12 @@ class SQLiteAdapter(DataAdapter):
GROUP BY s.session_id
ORDER BY s.created_at DESC
""")
sessions = []
for row in cursor.fetchall():
sessions.append(SessionInfo(
session_id=row['session_id'],
session_name=row['session_name'] or row['session_id'],
created_at=row['created_at'] or '',
description=row['description'] or '',
run_count=row['run_count'] or 0
@ -594,20 +597,17 @@ def get_column_label(column: str) -> str:
labels = {
't_ns': 'Time (ns)',
'time_ms': 'Time (ms)',
'motor_current': 'Motor Current',
'motor_current': 'Motor Current (A)',
'encoder_value': 'Encoder Value',
'relative_encoder_value': 'Relative Encoder',
'v24_pec_diff': 'V24 PEC Diff',
'pwm': 'PWM',
'i2c_raw14': 'I2C Raw (14-bit)',
'i2c_zero_raw14': 'I2C Zero Raw',
'i2c_delta_raw14': 'I2C Delta Raw',
'i2c_angle_deg': 'Angle (degrees)',
'i2c_zero_angle_deg': 'Zero Angle (degrees)',
'angular_velocity': 'Angular Velocity (deg/s)',
'angular_acceleration': 'Angular Acceleration (deg/s²)'
'v24_pec_diff': 'V24 PEC Diff (V)',
'pwm': 'PWM (%)',
'i2c_angle_deg': 'Angle (°)',
'i2c_zero_angle_deg': 'Zero Angle (°)',
'angular_velocity': 'Angular Velocity (°/s)',
'angular_acceleration': 'Angular Acceleration (°/s²)'
}
return labels.get(column, column)
return labels.get(column, column.replace('_', ' ').title())
def get_column_groups() -> Dict[str, List[str]]:
@ -619,10 +619,9 @@ def get_column_groups() -> Dict[str, List[str]]:
"""
return {
'Time': ['t_ns', 'time_ms'],
'UART': ['motor_current', 'encoder_value', 'relative_encoder_value',
'UART': ['motor_current', 'encoder_value', 'relative_encoder_value',
'v24_pec_diff', 'pwm'],
'I2C': ['i2c_raw14', 'i2c_zero_raw14', 'i2c_delta_raw14',
'i2c_angle_deg', 'i2c_zero_angle_deg'],
'I2C': ['i2c_angle_deg', 'i2c_zero_angle_deg'],
'Derived': ['angular_velocity', 'angular_acceleration']
}

@ -26,10 +26,11 @@ from pathlib import Path
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QTabWidget, QWidget, QVBoxLayout,
QHBoxLayout, QLabel, QMenuBar, QMenu, QMessageBox, QStatusBar
QHBoxLayout, QLabel, QMenuBar, QMenu, QMessageBox, QStatusBar,
QPushButton
)
from PyQt6.QtCore import Qt, QTimer
from PyQt6.QtGui import QAction
from PyQt6.QtGui import QAction, QFont
# Import database
from database.init_database import DatabaseManager
@ -185,6 +186,7 @@ class MainWindow(QMainWindow):
try:
from session_widget import SessionWidget
self.session_widget = SessionWidget(self.db_manager)
self.session_widget.set_main_window(self) # Pass reference for tab control
self.tabs.addTab(self.session_widget, "Session")
except ImportError as e:
# Fallback to placeholder if SessionWidget not available
@ -247,7 +249,20 @@ class MainWindow(QMainWindow):
self.tab_graph = self._create_placeholder_tab("Graph Visualization\n(Error loading)")
self.tabs.addTab(self.tab_graph, "Graph")
self.graph_widget = None
# Tab 7: Database Manager
try:
from database.database_manager_widget import DatabaseManagerWidget
self.db_manager_widget = DatabaseManagerWidget(db_manager=self.db_manager)
self.tabs.addTab(self.db_manager_widget, "Database Manager")
except Exception as e:
print(f"[WARN] Failed to load Database Manager widget: {e}")
import traceback
traceback.print_exc()
self.tab_db_manager = self._create_placeholder_tab("Database Manager\n(Error loading)")
self.tabs.addTab(self.tab_db_manager, "Database Manager")
self.db_manager_widget = None
def _create_placeholder_tab(self, title: str) -> QWidget:
"""Create placeholder tab widget."""
widget = QWidget()
@ -333,6 +348,13 @@ class MainWindow(QMainWindow):
database_menu.addSeparator()
action_backup = QAction("&Backup Database...", self)
action_backup.setShortcut("Ctrl+B")
action_backup.triggered.connect(self._on_backup_database)
database_menu.addAction(action_backup)
database_menu.addSeparator()
action_vacuum = QAction("&Vacuum Database", self)
action_vacuum.triggered.connect(self._on_vacuum_database)
database_menu.addAction(action_vacuum)
@ -340,12 +362,6 @@ class MainWindow(QMainWindow):
action_db_info = QAction("Database &Info...", self)
action_db_info.triggered.connect(self._on_database_info)
database_menu.addAction(action_db_info)
database_menu.addSeparator()
action_cleanup = QAction("&Cleanup Old Data...", self)
action_cleanup.triggered.connect(self._on_cleanup_data)
database_menu.addAction(action_cleanup)
# Help menu
help_menu = menubar.addMenu("&Help")
@ -600,14 +616,17 @@ class MainWindow(QMainWindow):
# Update all widgets with new database connection
if self.session_widget:
self.session_widget.db_manager = self.db_manager
self.session_widget.db_conn = self.db_manager.get_connection()
self.session_widget.refresh_profiles()
if self.configure_session_widget:
self.configure_session_widget.db_manager = self.db_manager
self.configure_session_widget.db_conn = self.db_manager.get_connection()
self.configure_session_widget._load_profiles()
if self.configure_interface_widget:
self.configure_interface_widget.db_manager = self.db_manager
self.configure_interface_widget.db_conn = self.db_manager.get_connection()
self.configure_interface_widget._load_profiles()
if self.uart_widget:
@ -634,6 +653,56 @@ class MainWindow(QMainWindow):
f"Failed to change database:\n{str(e)}"
)
def _on_backup_database(self):
"""Backup database to selected location."""
from PyQt6.QtWidgets import QFileDialog
import shutil
from datetime import datetime
# Generate default backup filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
db_basename = os.path.basename(self.db_path)
db_name_no_ext = os.path.splitext(db_basename)[0]
default_name = f"{db_name_no_ext}_backup_{timestamp}.db"
# Open save dialog
backup_path, _ = QFileDialog.getSaveFileName(
self,
"Backup Database",
os.path.join("./database", default_name),
"Database Files (*.db);;All Files (*)"
)
if not backup_path:
return # User cancelled
try:
# Ensure .db extension
if not backup_path.endswith('.db'):
backup_path += '.db'
# Copy database file
shutil.copy2(self.db_path, backup_path)
# Get file size
backup_size = os.path.getsize(backup_path)
size_str = self.db_manager.format_size(backup_size)
QMessageBox.information(
self,
"Backup Complete",
f"Database backed up successfully!\n\n"
f"Location: {backup_path}\n"
f"Size: {size_str}"
)
except Exception as e:
QMessageBox.critical(
self,
"Backup Failed",
f"Failed to backup database:\n{str(e)}"
)
def _on_vacuum_database(self):
"""Vacuum database."""
reply = QMessageBox.question(
@ -648,133 +717,152 @@ class MainWindow(QMainWindow):
QMessageBox.information(self, "Success", "Database vacuumed successfully!")
def _on_database_info(self):
"""Show database information."""
size_bytes, percentage, status = self.db_manager.check_size()
info = self.db_manager.get_table_info()
msg = f"Database: {self.db_path}\n\n"
msg += f"Size: {self.db_manager.format_size(size_bytes)} ({percentage:.1f}% of 2 GB)\n"
msg += f"Status: {status}\n\n"
msg += "Table Row Counts:\n"
for table, count in info.items():
msg += f" {table}: {count}\n"
QMessageBox.information(self, "Database Info", msg)
def _on_cleanup_data(self):
"""Cleanup old data."""
from PyQt6.QtWidgets import QDialog, QVBoxLayout, QHBoxLayout, QListWidget, QPushButton, QLabel
"""Show comprehensive database statistics."""
from PyQt6.QtWidgets import QDialog, QVBoxLayout, QTextEdit
# Get list of sessions
try:
cursor = self.db_manager.get_connection().execute("""
SELECT session_id, session_name, total_runs,
(SELECT COUNT(*) FROM telemetry_decoded WHERE telemetry_decoded.session_id = sessions.session_id) as row_count
FROM sessions
ORDER BY session_id DESC
""")
sessions = cursor.fetchall()
if not sessions:
QMessageBox.information(self, "Cleanup", "No sessions found in database.")
return
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to load sessions:\n{str(e)}")
return
# Create dialog
dialog = QDialog(self)
dialog.setWindowTitle("Cleanup Old Data")
dialog.setMinimumSize(600, 400)
layout = QVBoxLayout()
dialog.setLayout(layout)
# Instructions
label = QLabel("Select sessions to delete:")
layout.addWidget(label)
# Session list
session_list = QListWidget()
session_list.setSelectionMode(QListWidget.SelectionMode.MultiSelection)
for session_id, session_name, total_runs, row_count in sessions:
item_text = f"{session_id} - {session_name} ({total_runs} runs, {row_count} rows)"
session_list.addItem(item_text)
layout.addWidget(session_list)
# Buttons
btn_layout = QHBoxLayout()
btn_select_all = QPushButton("Select All")
btn_select_all.clicked.connect(session_list.selectAll)
btn_layout.addWidget(btn_select_all)
btn_clear = QPushButton("Clear Selection")
btn_clear.clicked.connect(session_list.clearSelection)
btn_layout.addWidget(btn_clear)
btn_layout.addStretch()
btn_delete = QPushButton("Delete Selected")
btn_delete.clicked.connect(lambda: self._delete_sessions(session_list, sessions, dialog))
btn_layout.addWidget(btn_delete)
btn_cancel = QPushButton("Cancel")
btn_cancel.clicked.connect(dialog.reject)
btn_layout.addWidget(btn_cancel)
layout.addLayout(btn_layout)
dialog.exec()
def _delete_sessions(self, session_list, sessions, dialog):
"""Delete selected sessions."""
selected_indices = [index.row() for index in session_list.selectedIndexes()]
if not selected_indices:
QMessageBox.warning(dialog, "No Selection", "Please select sessions to delete.")
return
# Get selected session IDs
session_ids = [sessions[i][0] for i in selected_indices]
# Confirm deletion
reply = QMessageBox.question(
dialog,
"Confirm Delete",
f"Delete {len(session_ids)} session(s)?\n\nThis action cannot be undone!",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
conn = self.db_manager.get_connection()
if reply != QMessageBox.StandardButton.Yes:
return
# === Database File Info ===
size_bytes, percentage, status = self.db_manager.check_size()
try:
conn = self.db_manager.get_connection()
for session_id in session_ids:
conn.execute("DELETE FROM telemetry_decoded WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM telemetry_raw WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
conn.commit()
# === Session Statistics ===
cursor = conn.execute("SELECT COUNT(*) FROM sessions")
total_sessions = cursor.fetchone()[0]
# Vacuum database
self.db_manager.vacuum()
cursor = conn.execute("SELECT SUM(total_runs) FROM sessions")
total_runs = cursor.fetchone()[0] or 0
QMessageBox.information(
dialog,
"Success",
f"Deleted {len(session_ids)} session(s) successfully!\n\nDatabase vacuumed."
)
cursor = conn.execute("""
SELECT session_id, session_name, total_runs, created_at
FROM sessions
ORDER BY total_runs DESC
LIMIT 1
""")
most_active = cursor.fetchone()
dialog.accept()
cursor = conn.execute("""
SELECT MIN(created_at), MAX(created_at)
FROM sessions
""")
date_range = cursor.fetchone()
# === Command Statistics ===
cursor = conn.execute("SELECT COUNT(*) FROM uart_commands")
uart_commands = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(*) FROM i2c_commands")
i2c_commands = cursor.fetchone()[0]
# === Profile Statistics ===
cursor = conn.execute("SELECT COUNT(*) FROM interface_profiles")
interface_profiles = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(*) FROM session_profiles")
session_profiles = cursor.fetchone()[0]
# === Telemetry Statistics ===
cursor = conn.execute("SELECT COUNT(*) FROM telemetry_raw")
raw_records = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(*) FROM telemetry_decoded")
decoded_records = cursor.fetchone()[0]
# Build info message
msg = "=" * 60 + "\n"
msg += "DATABASE OVERVIEW\n"
msg += "=" * 60 + "\n\n"
# Database File
msg += "📁 DATABASE FILE\n"
msg += "-" * 60 + "\n"
msg += f"Path: {self.db_path}\n"
msg += f"Size: {self.db_manager.format_size(size_bytes)} / 2.0 GB ({percentage:.1f}%)\n"
msg += f"Status: {status}\n\n"
# Sessions
msg += "📊 SESSIONS\n"
msg += "-" * 60 + "\n"
msg += f"Total Sessions: {total_sessions}\n"
msg += f"Total Runs: {total_runs}\n"
if total_sessions > 0:
msg += f"Average Runs/Session: {total_runs / total_sessions:.1f}\n"
if most_active:
msg += f"Most Active Session: {most_active[1]} ({most_active[2]} runs)\n"
if date_range[0] and date_range[1]:
msg += f"Date Range: {date_range[0]} to {date_range[1]}\n"
msg += "\n"
# Commands
msg += "⚙️ COMMANDS\n"
msg += "-" * 60 + "\n"
msg += f"UART Commands: {uart_commands}\n"
msg += f"I2C Commands: {i2c_commands}\n"
msg += f"Total Commands: {uart_commands + i2c_commands}\n\n"
# Profiles
msg += "📝 PROFILES\n"
msg += "-" * 60 + "\n"
msg += f"Interface Profiles: {interface_profiles}\n"
msg += f"Session Profiles: {session_profiles}\n\n"
# Telemetry
msg += "📈 TELEMETRY DATA\n"
msg += "-" * 60 + "\n"
msg += f"Raw Records: {raw_records:,}\n"
msg += f"Decoded Records: {decoded_records:,}\n"
msg += f"Total Data Points: {(raw_records + decoded_records):,}\n"
if decoded_records > 0:
msg += f"Avg Data Points/Run: {decoded_records / max(1, total_runs):.0f}\n"
msg += "\n"
# Table sizes
msg += "🗄️ TABLE DETAILS\n"
msg += "-" * 60 + "\n"
# Get all table names
cursor = conn.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
ORDER BY name
""")
tables = cursor.fetchall()
# Get row count for each table
for table_row in tables:
table_name = table_row[0]
try:
cursor = conn.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cursor.fetchone()[0]
msg += f"{table_name:30s}: {count:>10,} rows\n"
except Exception as e:
msg += f"{table_name:30s}: Error\n"
# Create dialog
dialog = QDialog(self)
dialog.setWindowTitle("Database Information")
dialog.resize(700, 600)
layout = QVBoxLayout()
dialog.setLayout(layout)
# Text display
text_edit = QTextEdit()
text_edit.setReadOnly(True)
text_edit.setPlainText(msg)
text_edit.setFont(QFont("Monospace", 9))
layout.addWidget(text_edit)
# Close button
close_btn = QPushButton("Close")
close_btn.clicked.connect(dialog.accept)
layout.addWidget(close_btn)
dialog.exec()
except Exception as e:
QMessageBox.critical(dialog, "Error", f"Failed to delete sessions:\n{str(e)}")
QMessageBox.critical(self, "Error", f"Failed to get database info:\n{str(e)}")
def _on_about(self):
"""Show about dialog."""
msg = "vzug-e-hinge Test & Control System\n\n"

@ -332,6 +332,50 @@ class Session(QObject):
self.error_occurred.emit(f"Failed to load session: {str(e)}")
return False
def unload_session(self):
"""
Unload session and close all hardware ports.
Called when session ends (complete, stop, or error) to:
- Close UART command, UART logger, and I2C ports
- Clear session state variables
- Reset execution flags
NOTE: Does NOT clear UI selections (session name, profile IDs).
Those are preserved to allow easy re-run.
"""
try:
# Close all ports
self._close_ports()
# Clear session state
self.current_session_id = None
# Keep session_name for display purposes
self.interface_profile_id = None
self.session_profile_id = None
# Clear phase data
self.phases = []
self.total_commands = 0
self.total_uart_runs = 0
self.total_i2c_commands = 0
self.current_command_index = 0
self.current_phase_name = ""
# Reset I2C zero reference
self.i2c_zero_reference = 0
# Reset execution flags
self.is_running = False
self.is_paused = False
self.pause_queued = False
self.stop_queued = False
self.status_changed.emit("Session unloaded")
except Exception as e:
self.error_occurred.emit(f"Error unloading session: {str(e)}")
def _load_phase_profile(self, profile_id: int, phase_name: str) -> tuple[bool, Optional[dict]]:
"""
Load a single session profile for a phase.
@ -628,31 +672,50 @@ class Session(QObject):
def _close_ports(self):
"""
Close UART and I2C ports.
Called at end of session or on error.
Ensures clean shutdown of hardware interfaces.
"""
import time
try:
# Close UART Command port
if self.uart_command_port:
uart_stop_reader(self.uart_command_port)
uart_close(self.uart_command_port)
self.uart_command_port = None
self.status_changed.emit("UART Command closed")
try:
uart_stop_reader(self.uart_command_port)
time.sleep(0.1) # Give reader thread time to stop
uart_close(self.uart_command_port)
self.uart_command_port = None
self.status_changed.emit("UART Command closed")
except Exception as e:
self.status_changed.emit(f"Warning: Error closing UART Command: {e}")
self.uart_command_port = None
# Close UART Logger port
if self.uart_logger_port:
uart_stop_reader(self.uart_logger_port)
uart_close(self.uart_logger_port)
self.uart_logger_port = None
self.status_changed.emit("UART Logger closed")
try:
uart_stop_reader(self.uart_logger_port)
time.sleep(0.1) # Give reader thread time to stop
uart_close(self.uart_logger_port)
self.uart_logger_port = None
self.status_changed.emit("UART Logger closed")
except Exception as e:
self.status_changed.emit(f"Warning: Error closing UART Logger: {e}")
self.uart_logger_port = None
# Close I2C
if self.i2c_handle:
i2c_close(self.i2c_handle)
self.i2c_handle = None
self.status_changed.emit("I2C closed")
try:
i2c_close(self.i2c_handle)
self.i2c_handle = None
self.status_changed.emit("I2C closed")
except Exception as e:
self.status_changed.emit(f"Warning: Error closing I2C: {e}")
self.i2c_handle = None
# Give OS time to fully release the ports
time.sleep(0.2)
except Exception as e:
self.error_occurred.emit(f"Error closing ports: {str(e)}")

@ -81,28 +81,31 @@ class SessionWidget(QWidget):
def __init__(self, db_manager: DatabaseManager):
"""
Initialize session widget.
Args:
db_manager: Database manager instance
"""
super().__init__()
# Store database manager
self.db_manager = db_manager
self.db_conn = db_manager.get_connection()
# Create session object
self.session = Session(db_manager)
# Worker thread for non-blocking execution
self.worker = None
# Reference to main window (for tab control)
self.main_window = None
# Connect signals from session to widget slots
self._connect_signals()
# Initialize UI
self._init_ui()
# Load initial data
self._load_profiles()
@ -131,7 +134,16 @@ class SessionWidget(QWidget):
self.session_name_input.setPlaceholderText("Auto-generated if empty")
name_layout.addWidget(self.session_name_input)
config_layout.addLayout(name_layout)
# Session notes input
notes_layout = QVBoxLayout()
notes_layout.addWidget(QLabel("Session Notes:"))
self.session_notes_input = QTextEdit()
self.session_notes_input.setMaximumHeight(60)
self.session_notes_input.setPlaceholderText("Optional: Add description or notes for this session...")
notes_layout.addWidget(self.session_notes_input)
config_layout.addLayout(notes_layout)
# Interface profile dropdown
interface_layout = QHBoxLayout()
interface_layout.addWidget(QLabel("Interface Profile:"))
@ -160,12 +172,17 @@ class SessionWidget(QWidget):
deinit_layout.addWidget(self.deinit_session_combo)
config_layout.addLayout(deinit_layout)
# Load button
# Load and Unload buttons
load_btn_layout = QHBoxLayout()
load_btn_layout.addStretch()
self.load_button = QPushButton("Load Session")
self.load_button.clicked.connect(self._on_load_clicked)
load_btn_layout.addWidget(self.load_button)
self.unload_button = QPushButton("Unload Session")
self.unload_button.clicked.connect(self._on_unload_clicked)
self.unload_button.setEnabled(False) # Initially disabled
load_btn_layout.addWidget(self.unload_button)
config_layout.addLayout(load_btn_layout)
config_group.setLayout(config_layout)
@ -327,6 +344,15 @@ class SessionWidget(QWidget):
Called when Session tab becomes active.
"""
self._load_profiles()
def set_main_window(self, main_window):
"""
Set reference to main window for tab control.
Args:
main_window: MainWindow instance
"""
self.main_window = main_window
# =========================================================================
# BUTTON HANDLERS
@ -355,6 +381,46 @@ class SessionWidget(QWidget):
if not session_name:
session_name = None # Will be auto-generated
# Check if session name already exists in database
if session_name:
cursor = self.db_conn.execute("""
SELECT COUNT(*) FROM sessions WHERE session_name = ?
""", (session_name,))
count = cursor.fetchone()[0]
if count > 0:
# Session name exists - prompt user
from PyQt6.QtWidgets import QMessageBox
reply = QMessageBox.question(
self,
"Session Name Exists",
f"Session name '{session_name}' already exists in database.\n\n"
f"Override will delete the existing session and all its telemetry data.\n\n"
f"What would you like to do?",
QMessageBox.StandardButton.Cancel | QMessageBox.StandardButton.Ok,
QMessageBox.StandardButton.Cancel
)
# Change OK button text to "Override"
if reply == QMessageBox.StandardButton.Cancel:
self._log_info("Load cancelled - session name already exists")
return
elif reply == QMessageBox.StandardButton.Ok:
# User chose to override - delete existing session data
self._log_info(f"Overriding existing session '{session_name}'...")
try:
# Delete from sessions table
self.db_conn.execute("DELETE FROM sessions WHERE session_name = ?", (session_name,))
# Delete from telemetry_raw table
self.db_conn.execute("DELETE FROM telemetry_raw WHERE session_name = ?", (session_name,))
# Delete from telemetry_decoded table
self.db_conn.execute("DELETE FROM telemetry_decoded WHERE session_name = ?", (session_name,))
self.db_conn.commit()
self._log_info(f"Existing session '{session_name}' deleted")
except Exception as e:
self._log_error(f"Failed to delete existing session: {e}")
return
# Load session with 3 phases
success = self.session.load_session(
interface_profile_id=interface_id,
@ -365,9 +431,13 @@ class SessionWidget(QWidget):
)
if success:
# Enable start button
# Enable start button and unload button
self.start_button.setEnabled(True)
self.unload_button.setEnabled(True)
self._log_info("Multi-phase session ready to start")
# Disable UART and I2C tabs to prevent port conflicts
self._disable_port_tabs()
else:
self._log_error("Failed to load session")
@ -375,12 +445,14 @@ class SessionWidget(QWidget):
"""Handle start button click."""
# Disable controls during startup
self.start_button.setEnabled(False)
self.unload_button.setEnabled(False)
self.load_button.setEnabled(False)
self.interface_profile_combo.setEnabled(False)
self.init_session_combo.setEnabled(False)
self.execute_session_combo.setEnabled(False)
self.deinit_session_combo.setEnabled(False)
self.session_name_input.setEnabled(False)
self.session_notes_input.setEnabled(False)
# Create worker thread for non-blocking execution
# Pass database path for thread-local connection
@ -425,32 +497,40 @@ class SessionWidget(QWidget):
"""Handle stop button click."""
self.session.stop_session()
# Button states will be updated by signal handlers
def _on_unload_clicked(self):
"""Handle unload session button click."""
# Unload the session and re-enable UART/I2C tabs
self._cleanup_session()
self._log_info("Session manually unloaded by user")
@pyqtSlot(bool)
def _on_worker_finished(self, success: bool):
"""
Handle worker thread finished.
Called when session execution completes (success or failure).
Re-enables UI controls for next session.
"""
# Cleanup worker
if self.worker:
self.worker = None
# Re-enable controls
self.start_button.setEnabled(True)
# Re-enable configuration controls
# NOTE: Start button will be disabled by _cleanup_session() since session is unloaded
# self.start_button.setEnabled(True) # <-- REMOVED: Don't enable, session is unloaded
self.load_button.setEnabled(True)
self.interface_profile_combo.setEnabled(True)
self.init_session_combo.setEnabled(True)
self.execute_session_combo.setEnabled(True)
self.deinit_session_combo.setEnabled(True)
self.session_name_input.setEnabled(True)
self.session_notes_input.setEnabled(True)
# Disable pause/stop buttons
self.pause_button.setEnabled(False)
self.stop_button.setEnabled(False)
if not success:
self._log_error("Session execution failed")
@ -463,6 +543,19 @@ class SessionWidget(QWidget):
"""Handle session started signal."""
self._update_status("Running", "green")
self._log_info(f"Session started: {session_id}")
# Save session notes to database
notes = self.session_notes_input.toPlainText().strip()
if notes:
try:
self.db_conn.execute("""
UPDATE sessions
SET notes = ?
WHERE session_id = ?
""", (notes, session_id))
self.db_conn.commit()
except Exception as e:
print(f"[WARN] Failed to save session notes: {e}")
@pyqtSlot(int, str)
def _on_command_started(self, command_no: int, command_name: str):
@ -504,10 +597,13 @@ class SessionWidget(QWidget):
"""Handle session finished signal."""
self._update_status("Finished", "green")
self.executing_label.setText("Session completed successfully")
# Reset button states
self._reset_controls()
# Cleanup: unload session and re-enable port tabs
self._cleanup_session()
self._log_info("Session completed successfully ✓")
@pyqtSlot(str)
@ -515,9 +611,12 @@ class SessionWidget(QWidget):
"""Handle error signal."""
self._update_status("Error", "red")
self._log_error(error_msg)
# Reset button states
self._reset_controls()
# Cleanup: unload session and re-enable port tabs
self._cleanup_session()
@pyqtSlot(str)
def _on_status_changed(self, status_text: str):
@ -599,15 +698,17 @@ class SessionWidget(QWidget):
self.execute_session_combo.setEnabled(True)
self.deinit_session_combo.setEnabled(True)
self.session_name_input.setEnabled(True)
self.session_notes_input.setEnabled(True)
# Reset button states
self.start_button.setEnabled(True)
# NOTE: Start button will be disabled by _cleanup_session() since session is unloaded
# self.start_button.setEnabled(True) # <-- REMOVED: Don't enable, session is unloaded
self.pause_button.setVisible(True)
self.pause_button.setEnabled(False)
self.resume_button.setVisible(False)
self.resume_button.setEnabled(False)
self.stop_button.setEnabled(False)
# Reset labels
self.executing_label.setText("Executing: ---")
self.command_label.setText("Command: 0 / 0")
@ -628,7 +729,7 @@ class SessionWidget(QWidget):
def _log_error(self, message: str):
"""
Log error message to data monitor.
Args:
message: Error message to log
"""
@ -638,6 +739,68 @@ class SessionWidget(QWidget):
self.log_display.verticalScrollBar().maximum()
)
def _disable_port_tabs(self):
"""
Disable UART and I2C tabs to prevent port conflicts.
Called when session is loaded.
"""
if not self.main_window:
return
try:
# UART tab is at index 3, I2C tab is at index 4
self.main_window.tabs.setTabEnabled(3, False) # UART
self.main_window.tabs.setTabEnabled(4, False) # I2C
self._log_info("UART and I2C tabs disabled (port conflict prevention)")
except Exception as e:
print(f"[WARN] Failed to disable port tabs: {e}")
def _enable_port_tabs(self):
"""
Re-enable UART and I2C tabs.
Called when session ends.
"""
if not self.main_window:
return
try:
# UART tab is at index 3, I2C tab is at index 4
self.main_window.tabs.setTabEnabled(3, True) # UART
self.main_window.tabs.setTabEnabled(4, True) # I2C
self._log_info("UART and I2C tabs re-enabled")
except Exception as e:
print(f"[WARN] Failed to enable port tabs: {e}")
def _cleanup_session(self):
"""
Cleanup after session ends.
- Unload session (closes ports)
- Re-enable UART/I2C tabs
- Disable Start/Pause/Stop/Unload buttons (no session loaded)
- Keep UI selections intact
"""
try:
# Unload session (closes all ports)
if self.session:
self.session.unload_session()
self._log_info("Session unloaded, ports closed")
# Re-enable port tabs
self._enable_port_tabs()
# Disable Start/Pause/Stop/Unload buttons (session is unloaded)
self.start_button.setEnabled(False)
self.pause_button.setEnabled(False)
self.stop_button.setEnabled(False)
self.unload_button.setEnabled(False)
# NOTE: We deliberately do NOT clear the UI selections
# (session_name_input, interface_profile_combo, etc.)
# so the user can easily re-run the same configuration
except Exception as e:
self._log_error(f"Cleanup error: {e}")
# =============================================================================
# MAIN (for testing)

6615
test.csv

File diff suppressed because it is too large Load Diff

@ -137,8 +137,8 @@ class UARTConfig:
baudrate: int
data_bits: int = 8
stop_bits: int = 1
parity: str = 'E'
buffer_size: int = 0.256 * 1024 * 1024 # 4MB default buffer
parity: str = 'N'
buffer_size: int = 256 * 1024 # 256KB default buffer
read_chunk_size: int = 512
stop_mode: StopConditionMode = StopConditionMode.TIMEOUT

Loading…
Cancel
Save