Working od desing

main
Kynsight 3 weeks ago
parent c22cab503b
commit e23e2e24b2

Binary file not shown.

@ -40,7 +40,7 @@ class PlotConfig:
"""Plot configuration."""
title: str = "Telemetry Data"
xlabel: str = "Time (s)"
figsize: Tuple[int, int] = (12, 8)
figsize: Tuple[int, int] = (16, 10) # Larger default size for better display
dpi: int = 100
grid: bool = True
legend: bool = True
@ -48,6 +48,11 @@ class PlotConfig:
linestyle: str = "-" # Line style
marker: Optional[str] = None # Marker style
markersize: int = 3 # Marker size
# Font sizes (smaller for Raspberry Pi displays)
title_fontsize: int = 10
label_fontsize: int = 8
tick_fontsize: int = 7
legend_fontsize: int = 7
# =============================================================================
@ -92,25 +97,32 @@ def plot_overlay(
if x_data is None or y_data is None:
continue
# Updated label format: "session_name run_no (name)"
label = f"{data.session_id} {data.run_no} ({data.session_name})"
# Label format: "session_name (run_no) [command] - Data"
label = f"{data.session_name} ({data.run_no}) [{data.command_name}] - {ylabel}"
ax.plot(x_data, y_data, label=label, alpha=0.8,
linestyle=config.linestyle, marker=config.marker,
markersize=config.markersize)
# Formatting
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.set_title(config.title)
ax.set_xlabel(xlabel, fontsize=config.label_fontsize)
ax.set_ylabel(ylabel, fontsize=config.label_fontsize)
ax.set_title(config.title, fontsize=config.title_fontsize)
ax.tick_params(axis='both', labelsize=config.tick_fontsize)
if config.grid:
ax.grid(True, alpha=0.3)
if config.legend:
# Only add legend if there are labeled artists
handles, labels = ax.get_legend_handles_labels()
if handles and labels:
# Legend at bottom, outside plot
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.12), ncol=2)
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.12), ncol=2, fontsize=config.legend_fontsize)
fig.tight_layout(rect=[0, 0.08, 1, 1]) # Leave space for legend
else:
fig.tight_layout()
else:
fig.tight_layout()
return fig
@ -170,10 +182,11 @@ def plot_subplots(
ax.plot(x_data, y_data, alpha=0.8,
linestyle=config.linestyle, marker=config.marker,
markersize=config.markersize)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
# Updated title format
ax.set_title(f"{data.session_id} {data.run_no} ({data.session_name})")
ax.set_xlabel(xlabel, fontsize=config.label_fontsize)
ax.set_ylabel(ylabel, fontsize=config.label_fontsize)
# Title format: "session_name (run_no) [command]"
ax.set_title(f"{data.session_name} ({data.run_no}) [{data.command_name}]", fontsize=config.title_fontsize)
ax.tick_params(axis='both', labelsize=config.tick_fontsize)
if config.grid:
ax.grid(True, alpha=0.3)
@ -229,8 +242,8 @@ def plot_comparison(
# Plot reference as baseline (zero)
ax.axhline(y=0, color='black', linestyle='--',
# Updated label format
label=f'Reference: {reference.session_id} {reference.run_no} ({reference.session_name})')
# Label format: "session_name (run_no) [command] - Data"
label=f'Reference: {reference.session_name} ({reference.run_no}) [{reference.command_name}]')
# Set color cycle
ax.set_prop_cycle(color=plt.cm.tab10.colors)
@ -255,25 +268,32 @@ def plot_comparison(
# Calculate deviation
deviation = y_interp - ref_y
# Updated label format
label = f"{data.session_id} {data.run_no} ({data.session_name})"
# Label format: "session_name (run_no) [command] - Data"
label = f"{data.session_name} ({data.run_no}) [{data.command_name}] - {ylabel}"
ax.plot(ref_x, deviation, label=label, alpha=0.8,
linestyle=config.linestyle, marker=config.marker,
markersize=config.markersize)
# Formatting
ax.set_xlabel(xlabel)
ax.set_ylabel(f"Deviation in {ylabel}")
ax.set_title(f"{config.title} - Drift Analysis")
ax.set_xlabel(xlabel, fontsize=config.label_fontsize)
ax.set_ylabel(f"Deviation in {ylabel}", fontsize=config.label_fontsize)
ax.set_title(f"{config.title} - Drift Analysis", fontsize=config.title_fontsize)
ax.tick_params(axis='both', labelsize=config.tick_fontsize)
if config.grid:
ax.grid(True, alpha=0.3)
if config.legend:
# Only add legend if there are labeled artists
handles, labels = ax.get_legend_handles_labels()
if handles and labels:
# Legend at bottom
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.12), ncol=2)
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.12), ncol=2, fontsize=config.legend_fontsize)
fig.tight_layout(rect=[0, 0.08, 1, 1])
else:
fig.tight_layout()
else:
fig.tight_layout()
return fig
@ -330,24 +350,28 @@ def plot_multi_series(
if x_data is None or y_data is None:
continue
# Updated label format
label = f"{data.session_id} {data.run_no} ({data.session_name})"
# Label format: "session_name (run_no) [command] - Data"
label = f"{data.session_name} ({data.run_no}) [{data.command_name}] - {ylabel}"
ax.plot(x_data, y_data, label=label, alpha=0.8,
linestyle=config.linestyle, marker=config.marker,
markersize=config.markersize)
ax.set_ylabel(ylabel)
ax.set_ylabel(ylabel, fontsize=config.label_fontsize)
ax.tick_params(axis='both', labelsize=config.tick_fontsize)
if config.grid:
ax.grid(True, alpha=0.3)
if config.legend and idx == 0: # Legend only on first subplot
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=2)
# Only add legend if there are labeled artists
handles, labels = ax.get_legend_handles_labels()
if handles and labels:
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=2, fontsize=config.legend_fontsize)
# X-label only on bottom subplot
axes[-1].set_xlabel(xlabel)
axes[-1].set_xlabel(xlabel, fontsize=config.label_fontsize)
fig.suptitle(config.title)
fig.suptitle(config.title, fontsize=config.title_fontsize)
fig.tight_layout(rect=[0, 0.08, 1, 0.96])
return fig
@ -394,26 +418,33 @@ def plot_xy_scatter(
if x_data is None or y_data is None:
continue
# Updated label format
label = f"{data.session_id} {data.run_no} ({data.session_name})"
# Label format: "session_name (run_no) [command] - Data"
label = f"{data.session_name} ({data.run_no}) [{data.command_name}] - {ylabel}"
ax.plot(x_data, y_data, label=label, alpha=0.8,
linestyle=config.linestyle, marker=config.marker or 'o',
markersize=config.markersize if config.marker else 2,
linewidth=1)
# Formatting
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.set_title(config.title)
ax.set_xlabel(xlabel, fontsize=config.label_fontsize)
ax.set_ylabel(ylabel, fontsize=config.label_fontsize)
ax.set_title(config.title, fontsize=config.title_fontsize)
ax.tick_params(axis='both', labelsize=config.tick_fontsize)
if config.grid:
ax.grid(True, alpha=0.3)
if config.legend:
# Only add legend if there are labeled artists
handles, labels = ax.get_legend_handles_labels()
if handles and labels:
# Legend at bottom
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.12), ncol=2)
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.12), ncol=2, fontsize=config.legend_fontsize)
fig.tight_layout(rect=[0, 0.08, 1, 1])
else:
fig.tight_layout()
else:
fig.tight_layout()
return fig

@ -21,15 +21,24 @@ from PyQt6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QPushButton, QComboBox,
QGroupBox, QTreeWidget, QTreeWidgetItem,
QCheckBox, QFileDialog, QMessageBox, QSplitter
QCheckBox, QFileDialog, QMessageBox, QSplitter,
QSizePolicy
)
from PyQt6.QtCore import Qt, pyqtSignal
from PyQt6.QtGui import QFont
from typing import Optional
try:
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
from matplotlib.figure import Figure
MATPLOTLIB_AVAILABLE = True
except Exception as e:
print(f"[WARN] Matplotlib import failed: {e}")
MATPLOTLIB_AVAILABLE = False
FigureCanvas = None
NavigationToolbar = None
Figure = None
# Import from table_query (data access layer)
from graph_table_query import (
@ -76,11 +85,27 @@ class GraphWidget(QWidget):
self.selected_runs = [] # List of (session_id, run_no) tuples
self.loaded_data = [] # List of TelemetryData objects
# Initialize matplotlib figure/canvas to None
self.figure = None
self.canvas = None
self.toolbar = None
# Build UI
try:
self._init_ui()
except Exception as e:
print(f"[ERROR] Failed to initialize graph widget UI: {e}")
import traceback
traceback.print_exc()
return
# Connect to database
try:
self._connect_database()
except Exception as e:
print(f"[ERROR] Failed to connect to database on startup: {e}")
import traceback
traceback.print_exc()
def _init_ui(self):
"""Create user interface."""
@ -97,8 +122,8 @@ class GraphWidget(QWidget):
splitter = QSplitter(Qt.Orientation.Horizontal)
splitter.addWidget(left_panel)
splitter.addWidget(right_panel)
splitter.setStretchFactor(0, 1) # Left: 25%
splitter.setStretchFactor(1, 3) # Right: 75%
splitter.setStretchFactor(0, 1) # Left: ~15%
splitter.setStretchFactor(1, 6) # Right: ~85%
layout.addWidget(splitter)
@ -115,30 +140,14 @@ class GraphWidget(QWidget):
layout = QVBoxLayout()
panel.setLayout(layout)
# 1. Database selector (top)
db_group = QGroupBox("Database")
db_layout = QVBoxLayout()
db_group.setLayout(db_layout)
db_row = QHBoxLayout()
# 1. Database selector (HIDDEN - configured via menu)
# Keep combo_database for internal compatibility, but don't display it
self.combo_database = QComboBox()
self.combo_database.setEditable(True)
self.combo_database.addItem("./database/ehinge.db")
self.combo_database.setCurrentText(self.db_path)
db_row.addWidget(self.combo_database)
btn_browse = QPushButton("Browse...")
btn_browse.clicked.connect(self._browse_database)
btn_browse.setMaximumWidth(80)
db_row.addWidget(btn_browse)
db_layout.addLayout(db_row)
btn_connect = QPushButton("🔗 Connect")
btn_connect.clicked.connect(self._connect_database)
db_layout.addWidget(btn_connect)
layout.addWidget(db_group)
# Note: Database path is controlled via main menu (Database > Change Database...)
# No visible UI controls here
# 2. Plot controls (right after database!)
plot_controls = self._create_plot_controls()
@ -159,7 +168,7 @@ class GraphWidget(QWidget):
tree_group.setLayout(tree_layout)
self.tree_sessions = QTreeWidget()
self.tree_sessions.setHeaderLabels(["Session / Run", "Samples", "Duration"])
self.tree_sessions.setHeaderLabels(["Session / Run", "Samples"])
self.tree_sessions.itemChanged.connect(self._on_selection_changed)
tree_layout.addWidget(self.tree_sessions)
@ -237,6 +246,9 @@ class GraphWidget(QWidget):
self.check_angular_velocity.setChecked(True) # Default
layout.addWidget(self.check_angular_velocity)
self.check_angular_acceleration = QCheckBox("angular_acceleration")
layout.addWidget(self.check_angular_acceleration)
return group
def _create_plot_panel(self):
@ -245,20 +257,19 @@ class GraphWidget(QWidget):
layout = QVBoxLayout()
panel.setLayout(layout)
# Remove all margins for maximum space
# Remove all margins and spacing for maximum space
layout.setSpacing(0)
layout.setContentsMargins(0, 0, 0, 0)
# Store layout reference for canvas recreation
self.plot_layout = layout
# Matplotlib canvas (FULL SPACE - top to bottom!)
self.figure = Figure(figsize=(10, 6), dpi=100)
self.canvas = FigureCanvas(self.figure)
layout.addWidget(self.canvas)
# Matplotlib toolbar (zoom, pan, save)
self.toolbar = NavigationToolbar(self.canvas, self)
layout.addWidget(self.toolbar)
# Matplotlib canvas - deferred initialization to avoid startup crash
# Canvas will be created when first plot is generated
placeholder_label = QLabel("Select sessions/runs and click 'Generate Plot' to visualize data")
placeholder_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
placeholder_label.setStyleSheet("color: #888888; font-size: 14pt;")
layout.addWidget(placeholder_label, 1) # stretch=1 to fill space
return panel
@ -326,7 +337,7 @@ class GraphWidget(QWidget):
"time_ms", "t_ns",
"motor_current", "encoder_value", "relative_encoder_value", "v24_pec_diff", "pwm",
"i2c_raw14", "i2c_zero_raw14", "i2c_delta_raw14", "i2c_angle_deg", "i2c_zero_angle_deg",
"angular_velocity"
"angular_velocity", "angular_acceleration"
])
self.combo_xy_x.setCurrentText("motor_current")
self.xy_row.addWidget(self.combo_xy_x)
@ -338,7 +349,7 @@ class GraphWidget(QWidget):
"time_ms", "t_ns",
"motor_current", "encoder_value", "relative_encoder_value", "v24_pec_diff", "pwm",
"i2c_raw14", "i2c_zero_raw14", "i2c_delta_raw14", "i2c_angle_deg", "i2c_zero_angle_deg",
"angular_velocity"
"angular_velocity", "angular_acceleration"
])
self.combo_xy_y.setCurrentText("pwm")
self.xy_row.addWidget(self.combo_xy_y)
@ -404,11 +415,15 @@ class GraphWidget(QWidget):
def _connect_database(self):
"""Connect to selected database."""
try:
db_path = self.combo_database.currentText()
# Close previous connection
if self.adapter:
try:
self.adapter.close()
except Exception:
pass
# Determine adapter type based on file extension
if db_path.endswith('.csv'):
@ -424,7 +439,12 @@ class GraphWidget(QWidget):
if self.adapter.connect():
self._load_sessions()
else:
QMessageBox.critical(self, "Connection Error", f"Failed to connect to database:\n{db_path}")
print(f"[WARN] Failed to connect to database: {db_path}")
except Exception as e:
print(f"[ERROR] Database connection failed: {e}")
import traceback
traceback.print_exc()
self.adapter = None
# =========================================================================
# Session/Run Loading
@ -432,15 +452,22 @@ class GraphWidget(QWidget):
def _load_sessions(self):
"""Load sessions from database and populate tree."""
try:
if not self.adapter:
return
self.sessions = self.adapter.get_sessions()
self.tree_sessions.clear()
if not self.sessions:
print("[INFO] No sessions found in database")
return
for session in self.sessions:
# Create session item
# Create session item with date
session_item = QTreeWidgetItem(self.tree_sessions)
session_item.setText(0, session.session_id)
session_item.setText(0, f"{session.session_id} ({session.created_at})")
session_item.setText(1, f"{session.run_count} runs")
session_item.setText(2, session.created_at)
session_item.setFlags(session_item.flags() | Qt.ItemFlag.ItemIsUserCheckable)
session_item.setCheckState(0, Qt.CheckState.Unchecked)
session_item.setData(0, Qt.ItemDataRole.UserRole, session.session_id)
@ -450,14 +477,17 @@ class GraphWidget(QWidget):
for run in runs:
run_item = QTreeWidgetItem(session_item)
run_item.setText(0, f"Run {run.run_number} ({run.session_name})")
run_item.setText(0, f"Run {run.run_number}: {run.command_name}")
run_item.setText(1, f"{run.sample_count} samples")
run_item.setText(2, f"{run.duration_ms:.1f}ms")
run_item.setFlags(run_item.flags() | Qt.ItemFlag.ItemIsUserCheckable)
run_item.setCheckState(0, Qt.CheckState.Unchecked)
run_item.setData(0, Qt.ItemDataRole.UserRole, (session.session_id, run.run_number))
self.tree_sessions.expandAll()
except Exception as e:
print(f"[ERROR] Failed to load sessions: {e}")
import traceback
traceback.print_exc()
def _on_selection_changed(self, item: QTreeWidgetItem, column: int):
"""Handle checkbox changes in tree."""
@ -575,6 +605,10 @@ class GraphWidget(QWidget):
series.append('angular_velocity')
labels.append('Angular Velocity')
if self.check_angular_acceleration.isChecked():
series.append('angular_acceleration')
labels.append('Angular Acceleration')
return series, labels
def _generate_plot(self):
@ -607,7 +641,7 @@ class GraphWidget(QWidget):
config = PlotConfig(
title="Telemetry Data Visualization",
figsize=(10, 6),
# Use default figsize (16, 10) for larger plots
linestyle=linestyle,
marker=marker,
markersize=markersize
@ -722,6 +756,9 @@ class GraphWidget(QWidget):
line_type = self.combo_line_type.currentText()
linestyle, marker, markersize = self._get_line_style(line_type)
# Track if any data was plotted
plotted_any = False
# Plot each combination of run × series
for data in data_list:
x_data = getattr(data, x_col, None)
@ -735,24 +772,40 @@ class GraphWidget(QWidget):
if y_data is None:
continue
# Label format: "session_name run_no (name) - series"
label = f"{data.session_id} {data.run_no} ({data.session_name}) - {ylabel}"
plotted_any = True
# Label format: "session_name (run_no) [command] - Data"
label = f"{data.session_name} ({data.run_no}) [{data.command_name}] - {ylabel}"
ax.plot(x_data, y_data, label=label, alpha=0.8,
linestyle=linestyle, marker=marker, markersize=markersize)
# Check if any data was plotted
if not plotted_any:
ax.text(0.5, 0.5, 'No data available for selected series\n(columns may not exist in database)',
ha='center', va='center', transform=ax.transAxes,
fontsize=config.label_fontsize, color='red')
# Formatting
ax.set_xlabel(xlabel)
ax.set_ylabel("Value")
ax.set_title(config.title)
ax.set_xlabel(xlabel, fontsize=config.label_fontsize)
ax.set_ylabel("Value", fontsize=config.label_fontsize)
ax.set_title(config.title, fontsize=config.title_fontsize)
ax.tick_params(axis='both', labelsize=config.tick_fontsize)
if config.grid:
ax.grid(True, alpha=0.3)
if config.legend:
# Only add legend if there are labeled artists
handles, labels = ax.get_legend_handles_labels()
if handles and labels:
# Legend at bottom, outside plot, 2 columns
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.12), ncol=2)
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.12), ncol=2, fontsize=config.legend_fontsize)
fig.tight_layout(rect=[0, 0.08, 1, 1]) # Leave space for legend
else:
# No data plotted - just tight layout without legend space
fig.tight_layout()
else:
fig.tight_layout()
return fig
@ -779,6 +832,13 @@ class GraphWidget(QWidget):
from PyQt6.QtWidgets import QApplication
import matplotlib.pyplot as plt
if not MATPLOTLIB_AVAILABLE:
error_label = QLabel("Matplotlib not available - cannot generate plots")
error_label.setStyleSheet("color: red;")
self.plot_layout.addWidget(error_label)
return
try:
# Close old matplotlib figure to free memory
if hasattr(self, 'figure') and self.figure is not None:
try:
@ -786,39 +846,55 @@ class GraphWidget(QWidget):
except Exception:
pass
# Remove old canvas and toolbar safely
if hasattr(self, 'canvas') and self.canvas is not None:
try:
self.plot_layout.removeWidget(self.canvas)
self.plot_layout.removeWidget(self.toolbar)
self.canvas.close()
self.toolbar.close()
self.canvas.deleteLater()
self.toolbar.deleteLater()
self.canvas = None
self.toolbar = None
# Remove old widgets from layout (canvas, toolbar, or placeholder)
# Clear all widgets from plot_layout
while self.plot_layout.count():
item = self.plot_layout.takeAt(0)
if item.widget():
widget = item.widget()
widget.deleteLater()
# Process events to ensure cleanup
QApplication.processEvents()
except RuntimeError:
# Widget already deleted
pass
# Store figure reference
self.figure = figure
# Create fresh canvas and toolbar
self.canvas = FigureCanvas(figure)
# Set canvas size policy to expand and fill available space
self.canvas.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
# Set minimum size to ensure it's visible
self.canvas.setMinimumSize(400, 300)
# Enable tight layout for better space usage
try:
figure.set_tight_layout(True)
except:
pass
self.toolbar = NavigationToolbar(self.canvas, self)
# Add to layout
self.plot_layout.insertWidget(1, self.canvas) # After controls
self.plot_layout.insertWidget(2, self.toolbar) # After canvas
# Add to layout with stretch factor for canvas
self.plot_layout.addWidget(self.canvas, 1) # stretch=1 to take all available space
self.plot_layout.addWidget(self.toolbar) # toolbar stays at natural height
# Force canvas to resize to fill available space
self.canvas.updateGeometry()
# Draw the canvas
try:
self.canvas.draw()
except Exception as e:
print(f"Warning: Canvas draw failed: {e}")
print(f"[ERROR] Failed to update canvas: {e}")
import traceback
traceback.print_exc()
# Add error label
error_label = QLabel(f"Plot generation failed:\n{str(e)}")
error_label.setStyleSheet("color: red;")
self.plot_layout.addWidget(error_label)
def _show_plot_window(self, figure):
"""Show plot in separate window."""
@ -924,6 +1000,24 @@ class GraphWidget(QWidget):
# Cleanup
# =========================================================================
def set_database_path(self, new_db_path: str) -> bool:
"""
Change database path (called from main menu).
Args:
new_db_path: New database file path
Returns:
True if successfully connected, False otherwise
"""
try:
self.combo_database.setCurrentText(new_db_path)
self._connect_database()
return True
except Exception as e:
print(f"[ERROR] Failed to change database: {e}")
return False
def closeEvent(self, event):
"""Close database connection on widget close."""
if self.adapter:

@ -51,6 +51,7 @@ class RunInfo:
start_time_ns: int
end_time_ns: int
duration_ms: float
command_name: str = "Unknown" # UART or I2C command name
@dataclass
@ -59,6 +60,7 @@ class TelemetryData:
session_id: str
session_name: str
run_no: int
command_name: str = "Unknown" # UART or I2C command name
# Time axes
t_ns: Optional[np.ndarray] = None
@ -80,6 +82,7 @@ class TelemetryData:
# Derived data
angular_velocity: Optional[np.ndarray] = None
angular_acceleration: Optional[np.ndarray] = None
# =============================================================================
@ -229,16 +232,20 @@ class SQLiteAdapter(DataAdapter):
cursor.execute("""
SELECT
session_id,
session_name,
run_no,
t.session_id,
t.session_name,
t.run_no,
t.run_command_id,
COUNT(*) as sample_count,
MIN(t_ns) as start_time_ns,
MAX(t_ns) as end_time_ns
FROM telemetry_decoded
WHERE session_id = ?
GROUP BY session_id, run_no, session_name
ORDER BY run_no
MIN(t.t_ns) as start_time_ns,
MAX(t.t_ns) as end_time_ns,
COALESCE(u.command_name, i.command_name, 'Unknown') as command_name
FROM telemetry_decoded t
LEFT JOIN uart_commands u ON t.run_command_id = u.command_id
LEFT JOIN i2c_commands i ON t.run_command_id = i.command_id
WHERE t.session_id = ?
GROUP BY t.session_id, t.run_no, t.session_name, t.run_command_id
ORDER BY t.run_no
""", (session_id,))
runs = []
@ -253,7 +260,8 @@ class SQLiteAdapter(DataAdapter):
sample_count=row['sample_count'],
start_time_ns=row['start_time_ns'],
end_time_ns=row['end_time_ns'],
duration_ms=duration_ms
duration_ms=duration_ms,
command_name=row['command_name']
))
return runs
@ -272,23 +280,27 @@ class SQLiteAdapter(DataAdapter):
cursor.execute("""
SELECT
session_name,
t_ns,
time_ms,
motor_current,
encoder_value,
relative_encoder_value,
v24_pec_diff,
pwm,
i2c_raw14,
i2c_zero_raw14,
i2c_delta_raw14,
i2c_angle_deg,
i2c_zero_angle_deg,
angular_velocity
FROM telemetry_decoded
WHERE session_id = ? AND run_no = ?
ORDER BY t_ns
t.session_name,
t.t_ns,
t.time_ms,
t.motor_current,
t.encoder_value,
t.relative_encoder_value,
t.v24_pec_diff,
t.pwm,
t.i2c_raw14,
t.i2c_zero_raw14,
t.i2c_delta_raw14,
t.i2c_angle_deg,
t.i2c_zero_angle_deg,
t.angular_velocity,
t.angular_acceleration,
COALESCE(u.command_name, i.command_name, 'Unknown') as command_name
FROM telemetry_decoded t
LEFT JOIN uart_commands u ON t.run_command_id = u.command_id
LEFT JOIN i2c_commands i ON t.run_command_id = i.command_id
WHERE t.session_id = ? AND t.run_no = ?
ORDER BY t.t_ns
""", (session_id, run_no))
rows = cursor.fetchall()
@ -296,14 +308,16 @@ class SQLiteAdapter(DataAdapter):
if not rows:
return None
# Get session_name from first row
# Get session_name and command_name from first row
session_name = rows[0]['session_name']
command_name = rows[0]['command_name']
# Extract columns
data = TelemetryData(
session_id=session_id,
session_name=session_name,
run_no=run_no,
command_name=command_name,
t_ns=self._extract_column(rows, 't_ns', dtype=np.int64),
time_ms=self._extract_column(rows, 'time_ms', dtype=np.int64),
motor_current=self._extract_column(rows, 'motor_current', dtype=np.float32),
@ -316,7 +330,8 @@ class SQLiteAdapter(DataAdapter):
i2c_delta_raw14=self._extract_column(rows, 'i2c_delta_raw14', dtype=np.float32),
i2c_angle_deg=self._extract_column(rows, 'i2c_angle_deg', dtype=np.float32),
i2c_zero_angle_deg=self._extract_column(rows, 'i2c_zero_angle_deg', dtype=np.float32),
angular_velocity=self._extract_column(rows, 'angular_velocity', dtype=np.float32)
angular_velocity=self._extract_column(rows, 'angular_velocity', dtype=np.float32),
angular_acceleration=self._extract_column(rows, 'angular_acceleration', dtype=np.float32)
)
return data
@ -509,7 +524,8 @@ class CSVAdapter(DataAdapter):
i2c_delta_raw14=self._extract_column_csv(run_df, 'i2c_delta_raw14', dtype=np.float32),
i2c_angle_deg=self._extract_column_csv(run_df, 'i2c_angle_deg', dtype=np.float32),
i2c_zero_angle_deg=self._extract_column_csv(run_df, 'i2c_zero_angle_deg', dtype=np.float32),
angular_velocity=self._extract_column_csv(run_df, 'angular_velocity', dtype=np.float32)
angular_velocity=self._extract_column_csv(run_df, 'angular_velocity', dtype=np.float32),
angular_acceleration=self._extract_column_csv(run_df, 'angular_acceleration', dtype=np.float32)
)
return data
@ -560,7 +576,8 @@ def get_available_columns() -> List[str]:
'i2c_delta_raw14',
'i2c_angle_deg',
'i2c_zero_angle_deg',
'angular_velocity'
'angular_velocity',
'angular_acceleration'
]
@ -587,7 +604,8 @@ def get_column_label(column: str) -> str:
'i2c_delta_raw14': 'I2C Delta Raw',
'i2c_angle_deg': 'Angle (degrees)',
'i2c_zero_angle_deg': 'Zero Angle (degrees)',
'angular_velocity': 'Angular Velocity'
'angular_velocity': 'Angular Velocity (deg/s)',
'angular_acceleration': 'Angular Acceleration (deg/s²)'
}
return labels.get(column, column)
@ -605,7 +623,7 @@ def get_column_groups() -> Dict[str, List[str]]:
'v24_pec_diff', 'pwm'],
'I2C': ['i2c_raw14', 'i2c_zero_raw14', 'i2c_delta_raw14',
'i2c_angle_deg', 'i2c_zero_angle_deg'],
'Derived': ['angular_velocity']
'Derived': ['angular_velocity', 'angular_acceleration']
}

@ -0,0 +1,216 @@
#!/usr/bin/env python3
"""
Kalman Filter for Angular Position/Velocity/Acceleration
=========================================================
1D Kalman filter for tracking door angle from noisy encoder readings.
State vector: [angle, velocity, acceleration]
Measurement: angle (from 14-bit I2C encoder)
Author: Kynsight
Version: 1.0.0
"""
import numpy as np
from typing import Tuple, Optional
class AngleKalmanFilter:
"""
Kalman filter for angular position tracking.
Tracks angle (deg), angular velocity (deg/s), and angular acceleration (deg/).
Designed for 14-bit encoder with 0.022° quantization noise.
"""
def __init__(
self,
process_noise: float = 0.1,
measurement_noise: float = 0.022,
initial_angle: float = 0.0
):
"""
Initialize Kalman filter.
Args:
process_noise: Process noise covariance (system uncertainty)
measurement_noise: Measurement noise (encoder quantization = 0.022°)
initial_angle: Initial angle estimate (degrees)
"""
# State vector: [angle, velocity, acceleration]
self.x = np.array([initial_angle, 0.0, 0.0])
# State covariance matrix (uncertainty in estimates)
self.P = np.eye(3) * 1.0
# Process noise covariance
self.Q = np.array([
[process_noise, 0, 0],
[0, process_noise * 10, 0], # Velocity more uncertain
[0, 0, process_noise * 100] # Acceleration even more uncertain
])
# Measurement noise covariance (14-bit encoder = 360/16384 = 0.022°)
self.R = np.array([[measurement_noise ** 2]])
# Measurement matrix (we only measure angle)
self.H = np.array([[1.0, 0.0, 0.0]])
# Previous timestamp for dt calculation
self.prev_time_ns: Optional[int] = None
# Filter initialized flag
self.initialized = False
def predict(self, dt: float):
"""
Predict step: propagate state forward in time.
State transition model:
angle(k+1) = angle(k) + velocity(k)*dt + 0.5*accel(k)*dt²
velocity(k+1) = velocity(k) + accel(k)*dt
accel(k+1) = accel(k) (constant acceleration model)
Args:
dt: Time step in seconds
"""
# State transition matrix
F = np.array([
[1.0, dt, 0.5 * dt**2],
[0.0, 1.0, dt],
[0.0, 0.0, 1.0]
])
# Predict state
self.x = F @ self.x
# Predict covariance
self.P = F @ self.P @ F.T + self.Q
def update(self, measurement: float):
"""
Update step: correct prediction with measurement.
Args:
measurement: Measured angle in degrees
"""
# Measurement residual
z = np.array([[measurement]])
y = z - self.H @ self.x.reshape(-1, 1)
# Residual covariance
S = self.H @ self.P @ self.H.T + self.R
# Kalman gain
K = self.P @ self.H.T @ np.linalg.inv(S)
# Update state
self.x = self.x + (K @ y).flatten()
# Update covariance
I = np.eye(3)
self.P = (I - K @ self.H) @ self.P
def process(
self,
angle_measurement: float,
timestamp_ns: int
) -> Tuple[float, float, float]:
"""
Process one measurement (predict + update).
Args:
angle_measurement: Raw angle measurement (degrees)
timestamp_ns: Timestamp in nanoseconds
Returns:
(filtered_angle, filtered_velocity, filtered_acceleration)
"""
if not self.initialized:
# First measurement - initialize state
self.x[0] = angle_measurement
self.x[1] = 0.0
self.x[2] = 0.0
self.prev_time_ns = timestamp_ns
self.initialized = True
return (angle_measurement, 0.0, 0.0)
# Calculate time step
dt = (timestamp_ns - self.prev_time_ns) / 1_000_000_000.0
self.prev_time_ns = timestamp_ns
if dt <= 0:
# Duplicate timestamp - return previous estimate
return (self.x[0], self.x[1], self.x[2])
# Predict
self.predict(dt)
# Update with measurement
self.update(angle_measurement)
return (self.x[0], self.x[1], self.x[2])
def reset(self, angle: float = 0.0):
"""
Reset filter to initial state.
Args:
angle: Initial angle estimate
"""
self.x = np.array([angle, 0.0, 0.0])
self.P = np.eye(3) * 1.0
self.prev_time_ns = None
self.initialized = False
def get_state(self) -> Tuple[float, float, float]:
"""
Get current state estimate.
Returns:
(angle, velocity, acceleration)
"""
return (self.x[0], self.x[1], self.x[2])
# =============================================================================
# Demo
# =============================================================================
if __name__ == "__main__":
print("Kalman Filter Demo")
print("=" * 60)
# Create filter
kf = AngleKalmanFilter(
process_noise=0.1,
measurement_noise=0.022, # 14-bit encoder quantization
initial_angle=-84.0
)
# Simulate noisy measurements
import random
print("\nSimulating noisy angle measurements:")
print(f"{'Time (ms)':<12} {'Raw Angle':<12} {'Filtered':<12} {'Velocity':<12} {'Accel':<12}")
print("-" * 60)
true_angle = -84.0
time_ns = 0
for i in range(20):
time_ns += 35_000_000 # ~35ms intervals (typical packet rate)
# Add quantization noise (0.022° steps)
noise = random.choice([-0.022, 0, 0.022])
measurement = true_angle + noise
# Process measurement
angle, vel, accel = kf.process(measurement, time_ns)
print(f"{time_ns/1e6:<12.1f} {measurement:<12.4f} {angle:<12.4f} {vel:<12.4f} {accel:<12.4f}")
# Slowly increase angle (simulate door opening)
true_angle += 0.02
print("\n✓ Filter smooths quantization noise while tracking motion")

@ -326,6 +326,13 @@ class MainWindow(QMainWindow):
# Database menu
database_menu = menubar.addMenu("&Database")
action_change_db = QAction("&Change Database...", self)
action_change_db.setShortcut("Ctrl+D")
action_change_db.triggered.connect(self._on_change_database)
database_menu.addAction(action_change_db)
database_menu.addSeparator()
action_vacuum = QAction("&Vacuum Database", self)
action_vacuum.triggered.connect(self._on_vacuum_database)
database_menu.addAction(action_vacuum)
@ -555,6 +562,78 @@ class MainWindow(QMainWindow):
# TODO: Implement
QMessageBox.information(self, "New Profile", "Create test profile (coming soon)")
def _on_change_database(self):
"""Change database file."""
from PyQt6.QtWidgets import QFileDialog
# Show file dialog
new_db_path, _ = QFileDialog.getOpenFileName(
self,
"Select Database File",
"./database",
"Database Files (*.db *.sqlite);;All Files (*)"
)
if not new_db_path:
return # User cancelled
# Confirm change
reply = QMessageBox.question(
self,
"Change Database",
f"Switch to database:\n{new_db_path}\n\nThis will reload all data. Continue?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if reply != QMessageBox.StandardButton.Yes:
return
try:
# Close current database
if self.db_manager:
self.db_manager.close()
# Update path and reconnect
self.db_path = new_db_path
self._init_database()
# Update all widgets with new database connection
if self.session_widget:
self.session_widget.db_manager = self.db_manager
self.session_widget.refresh_profiles()
if self.configure_session_widget:
self.configure_session_widget.db_manager = self.db_manager
self.configure_session_widget._load_profiles()
if self.configure_interface_widget:
self.configure_interface_widget.db_manager = self.db_manager
self.configure_interface_widget._load_profiles()
if self.uart_widget:
self.uart_widget.db_conn = self.db_manager.get_connection()
if self.i2c_widget:
self.i2c_widget.db_conn = self.db_manager.get_connection()
if self.graph_widget:
self.graph_widget.set_database_path(new_db_path)
QMessageBox.information(
self,
"Success",
f"Database changed successfully!\n\nNow using: {new_db_path}"
)
self.status_bar.showMessage(f"Database changed to: {new_db_path}")
except Exception as e:
QMessageBox.critical(
self,
"Error",
f"Failed to change database:\n{str(e)}"
)
def _on_vacuum_database(self):
"""Vacuum database."""
reply = QMessageBox.question(
@ -585,8 +664,116 @@ class MainWindow(QMainWindow):
def _on_cleanup_data(self):
"""Cleanup old data."""
# TODO: Implement
QMessageBox.information(self, "Cleanup", "Data cleanup dialog (coming soon)")
from PyQt6.QtWidgets import QDialog, QVBoxLayout, QHBoxLayout, QListWidget, QPushButton, QLabel
# Get list of sessions
try:
cursor = self.db_manager.get_connection().execute("""
SELECT session_id, session_name, total_runs,
(SELECT COUNT(*) FROM telemetry_decoded WHERE telemetry_decoded.session_id = sessions.session_id) as row_count
FROM sessions
ORDER BY session_id DESC
""")
sessions = cursor.fetchall()
if not sessions:
QMessageBox.information(self, "Cleanup", "No sessions found in database.")
return
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to load sessions:\n{str(e)}")
return
# Create dialog
dialog = QDialog(self)
dialog.setWindowTitle("Cleanup Old Data")
dialog.setMinimumSize(600, 400)
layout = QVBoxLayout()
dialog.setLayout(layout)
# Instructions
label = QLabel("Select sessions to delete:")
layout.addWidget(label)
# Session list
session_list = QListWidget()
session_list.setSelectionMode(QListWidget.SelectionMode.MultiSelection)
for session_id, session_name, total_runs, row_count in sessions:
item_text = f"{session_id} - {session_name} ({total_runs} runs, {row_count} rows)"
session_list.addItem(item_text)
layout.addWidget(session_list)
# Buttons
btn_layout = QHBoxLayout()
btn_select_all = QPushButton("Select All")
btn_select_all.clicked.connect(session_list.selectAll)
btn_layout.addWidget(btn_select_all)
btn_clear = QPushButton("Clear Selection")
btn_clear.clicked.connect(session_list.clearSelection)
btn_layout.addWidget(btn_clear)
btn_layout.addStretch()
btn_delete = QPushButton("Delete Selected")
btn_delete.clicked.connect(lambda: self._delete_sessions(session_list, sessions, dialog))
btn_layout.addWidget(btn_delete)
btn_cancel = QPushButton("Cancel")
btn_cancel.clicked.connect(dialog.reject)
btn_layout.addWidget(btn_cancel)
layout.addLayout(btn_layout)
dialog.exec()
def _delete_sessions(self, session_list, sessions, dialog):
"""Delete selected sessions."""
selected_indices = [index.row() for index in session_list.selectedIndexes()]
if not selected_indices:
QMessageBox.warning(dialog, "No Selection", "Please select sessions to delete.")
return
# Get selected session IDs
session_ids = [sessions[i][0] for i in selected_indices]
# Confirm deletion
reply = QMessageBox.question(
dialog,
"Confirm Delete",
f"Delete {len(session_ids)} session(s)?\n\nThis action cannot be undone!",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if reply != QMessageBox.StandardButton.Yes:
return
try:
conn = self.db_manager.get_connection()
for session_id in session_ids:
conn.execute("DELETE FROM telemetry_decoded WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM telemetry_raw WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
conn.commit()
# Vacuum database
self.db_manager.vacuum()
QMessageBox.information(
dialog,
"Success",
f"Deleted {len(session_ids)} session(s) successfully!\n\nDatabase vacuumed."
)
dialog.accept()
except Exception as e:
QMessageBox.critical(dialog, "Error", f"Failed to delete sessions:\n{str(e)}")
def _on_about(self):
"""Show about dialog."""

Binary file not shown.

After

Width:  |  Height:  |  Size: 167 KiB

@ -21,6 +21,9 @@ import time
from typing import Tuple, Optional, List
import sqlite3
# Import global clock for timestamp synchronization
from global_clock import GlobalClock, now_ns as global_now_ns
# Import UART core
from uart.uart_kit.uart_core import (
UARTPort,
@ -50,6 +53,9 @@ from buffer_kit.circular_buffer import cb_fill_bytes, cb_capacity
# Import decoder
from decoder import decode_uart_packet, decode_i2c_sample
# Import Kalman filter
from kalman_filter import AngleKalmanFilter
class RunExecutor:
"""
@ -76,6 +82,14 @@ class RunExecutor:
self.i2c_failures = 0 # Counter for I2C read failures
self.i2c_zero_reference = 0 # Absolute angle used as zero (0 = not zeroed)
# For angular velocity/acceleration calculation
self.prev_angle_deg = None
self.prev_timestamp_ns = None
self.prev_velocity_deg_s = None
# Kalman filter for angle smoothing (reset per run)
self.kalman_filter = None
def execute_run(
self,
session_id: str,
@ -121,8 +135,21 @@ class RunExecutor:
self.i2c_readings.clear()
self.i2c_failures = 0 # Reset error counter
# Record run start time
run_start_ns = time.time_ns()
# Reset velocity/acceleration tracking for new run
self.prev_angle_deg = None
self.prev_timestamp_ns = None
self.prev_velocity_deg_s = None
# Reset Kalman filter for new run
self.kalman_filter = AngleKalmanFilter(
process_noise=0.1, # Tunable: system dynamics uncertainty
measurement_noise=0.022, # 14-bit encoder quantization
initial_angle=0.0 # Will be set by first measurement
)
# Record run start time (using global clock for consistency with packet timestamps)
# packet_info.start_timestamp is in nanoseconds (UART core converts internally)
run_start_ns = global_now_ns()
# ================================================================
# 1. Configure packet detection with callback (LOGGER PORT)
@ -425,7 +452,7 @@ class RunExecutor:
if not i2c_port:
return ("error", 0, "I2C port not available")
run_start_ns = time.time_ns()
run_start_ns = global_now_ns()
# Parse hex_string as byte count for reads
try:
@ -564,9 +591,28 @@ class RunExecutor:
decoded_uart = decode_uart_packet(packet_info.data)
decoded_i2c = decode_i2c_sample(i2c_bytes, self.i2c_zero_reference) if i2c_bytes else None
# Calculate relative time from run start
# Calculate relative time from run start (both in nanoseconds, convert to milliseconds)
time_ms = (packet_info.start_timestamp - run_start_ns) / 1_000_000.0
# Calculate angular velocity and acceleration using Kalman filter (if I2C data available)
angular_velocity = None
angular_acceleration = None
if decoded_i2c and decoded_i2c.get('i2c_angle_deg') is not None:
raw_angle = decoded_i2c.get('i2c_angle_deg')
current_time_ns = packet_info.start_timestamp # In nanoseconds
# Use Kalman filter for smoothed velocity/acceleration estimates
if self.kalman_filter is not None:
# Process measurement through Kalman filter
filtered_angle, angular_velocity, angular_acceleration = self.kalman_filter.process(
angle_measurement=raw_angle,
timestamp_ns=current_time_ns
)
# For duplicate timestamps (dt=0), filter returns previous estimates
# First packet returns velocity=0, accel=0 (initialized state)
# Save to telemetry_raw (backup) - BOTH uart_raw_packet AND i2c_raw_bytes in ONE row
cursor = self.db_conn.cursor()
cursor.execute("""
@ -579,7 +625,7 @@ class RunExecutor:
session_name,
run_no,
run_command_id,
packet_info.start_timestamp,
packet_info.start_timestamp, # Already in nanoseconds
time_ms,
packet_info.data,
i2c_bytes, # Can be None if no I2C
@ -592,14 +638,15 @@ class RunExecutor:
session_id, session_name, run_no, run_command_id,
t_ns, time_ms,
motor_current, encoder_value, relative_encoder_value, v24_pec_diff, pwm,
i2c_raw14, i2c_zero_raw14, i2c_delta_raw14, i2c_angle_deg, i2c_zero_angle_deg, i2c_zero_ref
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
i2c_raw14, i2c_zero_raw14, i2c_delta_raw14, i2c_angle_deg, i2c_zero_angle_deg,
angular_velocity, angular_acceleration, i2c_zero_ref
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
session_id,
session_name,
run_no,
run_command_id,
packet_info.start_timestamp,
packet_info.start_timestamp, # Already in nanoseconds
time_ms,
# UART decoded fields
decoded_uart.get('motor_current'),
@ -613,6 +660,9 @@ class RunExecutor:
decoded_i2c.get('i2c_delta_raw14') if decoded_i2c else None,
decoded_i2c.get('i2c_angle_deg') if decoded_i2c else None,
decoded_i2c.get('i2c_zero_angle_deg') if decoded_i2c else None,
# Derived fields
angular_velocity,
angular_acceleration,
self.i2c_zero_reference # Zero reference (0 if not zeroed)
))

@ -36,6 +36,9 @@ import json
import time
from datetime import datetime
# Global clock for timestamp synchronization
from global_clock import GlobalClock
# UART core
from uart.uart_kit.uart_core import (
UARTPort,
@ -454,7 +457,7 @@ class Session(QObject):
# 1. Open UART Command Port (TX/RX - ALWAYS NEEDED)
# ===================================================================
# Create UART command config
# Create UART command config (with GlobalClock for timestamps)
cmd_uart_config = UARTConfig(
device=self.interface_config['uart_command_port'],
baudrate=self.interface_config['uart_command_baud'],
@ -462,7 +465,8 @@ class Session(QObject):
stop_bits=self.interface_config['uart_command_stop_bits'],
parity=self.interface_config['uart_command_parity'],
buffer_size=40 * 1024 * 1024, # 40MB buffer
stop_timeout_ms=self.interface_config['uart_command_timeout_ms']
stop_timeout_ms=self.interface_config['uart_command_timeout_ms'],
timestamp_source=GlobalClock.instance().now # Use global clock
)
# Create UART command port
@ -496,7 +500,7 @@ class Session(QObject):
if self.interface_config['uart_logger_enable']:
# Logger enabled - open logger port
# Create UART logger config
# Create UART logger config (with GlobalClock for timestamps)
log_uart_config = UARTConfig(
device=self.interface_config['uart_logger_port'],
baudrate=self.interface_config['uart_logger_baud'],
@ -506,7 +510,8 @@ class Session(QObject):
buffer_size=40 * 1024 * 1024, # 40MB buffer
stop_timeout_ms=self.interface_config['uart_logger_timeout_ms'],
grace_timeout_ms=self.interface_config['uart_logger_grace_ms'],
polling_mode=True # Enable grace period for first byte
polling_mode=True, # Enable grace period for first byte
timestamp_source=GlobalClock.instance().now # Use global clock
)
# Create UART logger port
@ -588,10 +593,11 @@ class Session(QObject):
if self.interface_config['i2c_port']:
# Parse I2C address
# Create I2C config
# Create I2C config (with GlobalClock for timestamps)
i2c_config = I2CConfig(
bus_id=int(self.interface_config["i2c_port"]),
buffer_size=40 * 1024 * 1024 # 40MB buffer
buffer_size=40 * 1024 * 1024, # 40MB buffer
timestamp_source=GlobalClock.instance().now # Use global clock
)
# Create I2C handle

Loading…
Cancel
Save