#!/usr/bin/env python3 """ Session Module - vzug-e-hinge ============================== Orchestrates session execution with queued pause/stop behavior. This module coordinates the entire automated test session flow: - Loading interface and session profiles from database - Opening UART/I2C ports based on configuration - Executing command sequences via run.py - Managing pause/stop requests (queued after current run) - Providing real-time status updates via Qt signals - Handling session lifecycle (create, pause, resume, abort) Key Features: - Queued pause/stop (executes AFTER current run completes) - Countdown during delays with second-by-second updates - Real-time timestamp correlation (UART/I2C) - Automatic port management - Comprehensive error handling - Database integration with status tracking Author: Kynsight Version: 2.0.0 Date: 2025-11-09 """ # ============================================================================= # IMPORTS # ============================================================================= from PyQt6.QtCore import QObject, pyqtSignal from typing import Optional, Dict, List, Any import sqlite3 import json import time from datetime import datetime # UART core from uart.uart_kit.uart_core import ( UARTPort, UARTConfig, PacketConfig, uart_create, uart_open, uart_close, uart_start_reader, uart_stop_reader, Status as UARTStatus ) # I2C core from i2c.i2c_kit.i2c_core import ( I2CHandle, I2CConfig, i2c_create, i2c_open, i2c_close, Status as I2CStatus ) # Run executor from run import execute_run # Database manager from database.init_database import DatabaseManager # ============================================================================= # SESSION CLASS # ============================================================================= class Session(QObject): """ Session orchestration class. Manages the complete lifecycle of an automated test session: 1. Load profiles (interface + session) from database 2. Open hardware ports (UART + I2C) 3. Execute command sequence 4. Handle pause/stop requests (queued) 5. Provide real-time status updates 6. Close ports and finalize session Signals: session_started: Emitted when session starts (session_id) command_started: Emitted when command starts (command_no, command_name) run_completed: Emitted when run completes (run_no, packet_count) delay_countdown: Emitted during delay countdown (seconds_remaining) session_paused: Emitted when session pauses session_finished: Emitted when session completes successfully error_occurred: Emitted on error (error_message) status_changed: Emitted on status change (status_text) """ # ========================================================================= # SIGNALS (for GUI updates) # ========================================================================= session_started = pyqtSignal(str) # session_id command_started = pyqtSignal(int, str) # command_no, command_name run_completed = pyqtSignal(int, int) # run_no, packet_count delay_countdown = pyqtSignal(int) # seconds_remaining session_paused = pyqtSignal() session_finished = pyqtSignal() error_occurred = pyqtSignal(str) # error_message status_changed = pyqtSignal(str) # status_text raw_data_received = pyqtSignal(str, str) # direction (TX/RX), hex_string # ========================================================================= # CONSTRUCTOR # ========================================================================= def __init__(self, db_manager: DatabaseManager): """ Initialize session manager. Args: db_manager: Database manager instance """ super().__init__() # Database connection self.db_manager = db_manager self.db_conn = db_manager.get_connection() # Session state self.current_session_id: Optional[str] = None self.session_name: Optional[str] = None self.interface_profile_id: Optional[int] = None self.session_profile_id: Optional[int] = None # Hardware ports (managed by this class) self.uart_command_port: Optional[UARTPort] = None # For sending commands (TX/RX) self.uart_logger_port: Optional[UARTPort] = None # For telemetry packets (RX only, optional) self.i2c_handle: Optional[I2CHandle] = None self.packet_config: Optional[PacketConfig] = None # Command sequence (loaded from session_profile) self.commands: List[Dict[str, Any]] = [] self.total_commands: int = 0 self.current_command_index: int = 0 # Execution control flags self.is_running: bool = False self.is_paused: bool = False self.pause_queued: bool = False self.stop_queued: bool = False # ========================================================================= # PROFILE LOADING # ========================================================================= def load_session( self, interface_profile_id: int, session_profile_id: int, session_name: Optional[str] = None ) -> bool: """ Load session and interface profiles from database. This method: 1. Reads interface_profile (UART/I2C config, packet detection) 2. Reads session_profile (command sequence JSON) 3. Parses command sequence 4. Validates all commands exist in uart_commands table 5. Stores configuration for execution Args: interface_profile_id: ID from interface_profiles table session_profile_id: ID from session_profiles table session_name: Custom session name (auto-generated if None) Returns: True if successful, False on error """ try: # =================================================================== # 1. Load interface profile (UART/I2C configuration) # =================================================================== cursor = self.db_conn.execute(""" SELECT profile_name, uart_command_port, uart_command_baud, uart_command_data_bits, uart_command_stop_bits, uart_command_parity, uart_command_timeout_ms, uart_logger_enable, uart_logger_port, uart_logger_baud, uart_logger_data_bits, uart_logger_stop_bits, uart_logger_parity, uart_logger_timeout_ms, uart_logger_packet_detect_enable, uart_logger_packet_detect_start, uart_logger_packet_detect_length, uart_logger_packet_detect_end, i2c_port, i2c_slave_address, i2c_slave_read_register, i2c_slave_read_length FROM interface_profiles WHERE profile_id = ? """, (interface_profile_id,)) row = cursor.fetchone() if not row: self.error_occurred.emit(f"Interface profile {interface_profile_id} not found") return False # Store interface configuration self.interface_profile_id = interface_profile_id self.interface_config = { 'profile_name': row[0], # UART Command interface 'uart_command_port': row[1], 'uart_command_baud': row[2], 'uart_command_data_bits': row[3], 'uart_command_stop_bits': row[4], 'uart_command_parity': row[5], 'uart_command_timeout_ms': row[6], # UART Logger interface 'uart_logger_enable': bool(row[7]) if row[7] is not None else True, 'uart_logger_port': row[8], 'uart_logger_baud': row[9], 'uart_logger_data_bits': row[10], 'uart_logger_stop_bits': row[11], 'uart_logger_parity': row[12], 'uart_logger_timeout_ms': row[13], # Packet detection 'packet_detect_enable': row[14], 'packet_detect_start': row[15], 'packet_detect_length': row[16], 'packet_detect_end': row[17], # I2C configuration 'i2c_port': row[18], 'i2c_slave_address': row[19], 'i2c_slave_read_register': row[20], 'i2c_slave_read_length': row[21] } # =================================================================== # 2. Load session profile (command sequence) # =================================================================== cursor = self.db_conn.execute(""" SELECT profile_name, command_sequence, description, print_command_rx FROM session_profiles WHERE profile_id = ? """, (session_profile_id,)) row = cursor.fetchone() if not row: self.error_occurred.emit(f"Session profile {session_profile_id} not found") return False self.session_profile_id = session_profile_id profile_name = row[0] command_sequence_json = row[1] self.print_command_rx = bool(row[3]) if len(row) > 3 else False # Parse JSON command sequence try: command_sequence = json.loads(command_sequence_json) self.commands = command_sequence.get('commands', []) self.total_commands = len(self.commands) except json.JSONDecodeError as e: self.error_occurred.emit(f"Invalid JSON in session profile: {str(e)}") return False if self.total_commands == 0: self.error_occurred.emit("Session profile has no commands") return False # =================================================================== # 3. Validate all commands exist in database # =================================================================== for cmd in self.commands: cmd_id = cmd.get('command_id') if not cmd_id: self.error_occurred.emit("Command missing command_id") return False # Check if command exists cursor = self.db_conn.execute(""" SELECT command_name, hex_string FROM uart_commands WHERE command_id = ? """, (cmd_id,)) row = cursor.fetchone() if not row: self.error_occurred.emit(f"UART command {cmd_id} not found") return False # Store command details cmd['command_name'] = row[0] cmd['hex_string'] = row[1] # =================================================================== # 4. Generate session name if not provided # =================================================================== if session_name is None or session_name.strip() == "": timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.session_name = f"{profile_name}_{timestamp}" else: self.session_name = session_name # =================================================================== # 5. Emit success status # =================================================================== self.status_changed.emit(f"Session loaded: {self.session_name}") self.status_changed.emit(f"Interface: {self.interface_config['profile_name']}") self.status_changed.emit(f"Commands: {self.total_commands}") return True except Exception as e: self.error_occurred.emit(f"Failed to load session: {str(e)}") return False # ========================================================================= # PORT MANAGEMENT # ========================================================================= def _open_ports(self) -> bool: """ Open UART and I2C ports based on interface configuration. This method: 1. Opens UART Command port (TX/RX - for sending commands, always needed) 2. Opens UART Logger port (RX - for telemetry, optional) 3. Opens I2C port (for angle readings) 4. Creates PacketConfig from interface profile Returns: True if successful, False on error """ try: # =================================================================== # 1. Open UART Command Port (TX/RX - ALWAYS NEEDED) # =================================================================== # Create UART command config cmd_uart_config = UARTConfig( device=self.interface_config['uart_command_port'], baudrate=self.interface_config['uart_command_baud'] ) # Create UART command port status, self.uart_command_port = uart_create(cmd_uart_config) if status != UARTStatus.OK or self.uart_command_port is None: self.error_occurred.emit(f"Failed to create UART command port") return False # Open UART command port status = uart_open(self.uart_command_port) if status != UARTStatus.OK: self.error_occurred.emit(f"Failed to open UART command port {cmd_uart_config.device}") return False # Start UART command reader thread status = uart_start_reader(self.uart_command_port) if status != UARTStatus.OK: uart_close(self.uart_command_port) self.uart_command_port = None self.error_occurred.emit("Failed to start UART command reader") return False self.status_changed.emit(f"UART Command opened: {cmd_uart_config.device}") # =================================================================== # 2. Open UART Logger Port (RX - OPTIONAL) # =================================================================== if self.interface_config['uart_logger_enable']: # Logger enabled - open logger port # Create UART logger config log_uart_config = UARTConfig( device=self.interface_config['uart_logger_port'], baudrate=self.interface_config['uart_logger_baud'] ) # Create UART logger port status, self.uart_logger_port = uart_create(log_uart_config) if status != UARTStatus.OK or self.uart_logger_port is None: self.error_occurred.emit(f"Failed to create UART logger port") # Close command port before returning uart_stop_reader(self.uart_command_port) uart_close(self.uart_command_port) self.uart_command_port = None return False # Open UART logger port status = uart_open(self.uart_logger_port) if status != UARTStatus.OK: self.error_occurred.emit(f"Failed to open UART logger port {log_uart_config.device}") # Close command port before returning uart_stop_reader(self.uart_command_port) uart_close(self.uart_command_port) self.uart_command_port = None return False # Start UART logger reader thread status = uart_start_reader(self.uart_logger_port) if status != UARTStatus.OK: uart_close(self.uart_logger_port) self.uart_logger_port = None # Close command port before returning uart_stop_reader(self.uart_command_port) uart_close(self.uart_command_port) self.uart_command_port = None self.error_occurred.emit("Failed to start UART logger reader") return False self.status_changed.emit(f"UART Logger opened: {log_uart_config.device}") else: # Logger disabled - skip logger port self.uart_logger_port = None self.status_changed.emit("UART Logger disabled (command port only)") # =================================================================== # 3. Create PacketConfig from interface profile # =================================================================== # Check if print_command_rx is enabled (session profile override) if hasattr(self, 'print_command_rx') and self.print_command_rx: # Force packet detection OFF - just print TX/RX to data monitor self.packet_config = PacketConfig(enable=False) elif not self.interface_config['uart_logger_enable']: # Logger disabled - no packet detection self.packet_config = PacketConfig(enable=False) elif self.interface_config['packet_detect_enable']: # Parse hex markers start_marker = bytes.fromhex( self.interface_config['packet_detect_start'].replace(' ', '') ) if self.interface_config['packet_detect_start'] else None end_marker = bytes.fromhex( self.interface_config['packet_detect_end'].replace(' ', '') ) if self.interface_config['packet_detect_end'] else None self.packet_config = PacketConfig( enable=True, start_marker=start_marker, packet_length=self.interface_config['packet_detect_length'], end_marker=end_marker, on_packet_callback=None # Will be set by run.py ) else: self.packet_config = PacketConfig(enable=False) # =================================================================== # 4. Open I2C Port (optional - for angle readings) # =================================================================== if self.interface_config['i2c_port']: # Parse I2C address # Create I2C config i2c_config = I2CConfig( bus_id=int(self.interface_config["i2c_port"]), ) # Create I2C handle status, self.i2c_handle = i2c_create(i2c_config) if status != I2CStatus.OK or self.i2c_handle is None: # I2C is optional, just warn self.status_changed.emit("Warning: Could not create I2C handle") self.i2c_handle = None else: # Open I2C status = i2c_open(self.i2c_handle) if status != I2CStatus.OK: # I2C is optional, just warn self.status_changed.emit("Warning: I2C port not available") self.i2c_handle = None else: self.status_changed.emit(f"I2C opened: bus {self.interface_config['i2c_port']}") return True except Exception as e: self.error_occurred.emit(f"Failed to open ports: {str(e)}") self._close_ports() return False # Open UART port status = uart_open(self.uart_port) if status != UARTStatus.OK: self.error_occurred.emit(f"Failed to open UART port {uart_config.port}") return False # Start UART reader thread status = uart_start_reader(self.uart_port) if status != UARTStatus.OK: uart_close(self.uart_port) self.uart_port = None self.error_occurred.emit("Failed to start UART reader") return False self.status_changed.emit(f"UART opened: {uart_config.port}") # =================================================================== # 2. Create PacketConfig from interface profile # =================================================================== # Check if print_command_rx is enabled (session profile override) if hasattr(self, 'print_command_rx') and self.print_command_rx: # Force packet detection OFF - just print TX/RX to data monitor self.packet_config = PacketConfig(enable=False) elif self.interface_config['packet_detect_enable']: # Parse hex markers start_marker = bytes.fromhex( self.interface_config['packet_detect_start'].replace(' ', '') ) if self.interface_config['packet_detect_start'] else None end_marker = bytes.fromhex( self.interface_config['packet_detect_end'].replace(' ', '') ) if self.interface_config['packet_detect_end'] else None self.packet_config = PacketConfig( enable=True, start_marker=start_marker, packet_length=self.interface_config['packet_detect_length'], end_marker=end_marker, on_packet_callback=None # Will be set by run.py ) else: self.packet_config = PacketConfig(enable=False) # =================================================================== # 3. Open I2C Port (optional - for angle readings) # =================================================================== if self.interface_config['i2c_port']: # Parse I2C address # Create I2C config i2c_config = I2CConfig( bus_id=int(self.interface_config["i2c_port"]), ) # Create I2C handle status, self.i2c_handle = i2c_create(i2c_config) if status != I2CStatus.OK or self.i2c_handle is None: # I2C is optional, just warn self.status_changed.emit("Warning: Could not create I2C handle") self.i2c_handle = None else: # Open I2C status = i2c_open(self.i2c_handle) if status != I2CStatus.OK: # I2C is optional, just warn self.status_changed.emit("Warning: I2C port not available") self.i2c_handle = None else: self.status_changed.emit(f"I2C opened: bus {self.interface_config['i2c_port']}") return True except Exception as e: self.error_occurred.emit(f"Failed to open ports: {str(e)}") self._close_ports() return False def _close_ports(self): """ Close UART and I2C ports. Called at end of session or on error. Ensures clean shutdown of hardware interfaces. """ try: # Close UART Command port if self.uart_command_port: uart_stop_reader(self.uart_command_port) uart_close(self.uart_command_port) self.uart_command_port = None self.status_changed.emit("UART Command closed") # Close UART Logger port if self.uart_logger_port: uart_stop_reader(self.uart_logger_port) uart_close(self.uart_logger_port) self.uart_logger_port = None self.status_changed.emit("UART Logger closed") # Close I2C if self.i2c_handle: i2c_close(self.i2c_handle) self.i2c_handle = None self.status_changed.emit("I2C closed") except Exception as e: self.error_occurred.emit(f"Error closing ports: {str(e)}") # ========================================================================= # SESSION EXECUTION # ========================================================================= def start_session(self) -> bool: """ Start session execution. This method: 1. Creates session record in database 2. Opens hardware ports 3. Executes command loop 4. Handles delays with countdown 5. Checks pause/stop queue between runs 6. Finalizes session on completion Returns: True if started successfully, False on error """ if self.is_running: self.error_occurred.emit("Session already running") return False if not self.commands: self.error_occurred.emit("No session loaded") return False try: # =================================================================== # 1. Create session record in database # =================================================================== timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.current_session_id = f"session_{timestamp}" session_date = datetime.now().strftime("%Y-%m-%d") self.db_conn.execute(""" INSERT INTO sessions ( session_id, session_name, session_date, interface_profile_id, session_profile_id, status, total_runs ) VALUES (?, ?, ?, ?, ?, 'active', 0) """, ( self.current_session_id, self.session_name, session_date, self.interface_profile_id, self.session_profile_id )) self.db_conn.commit() # =================================================================== # 2. Open hardware ports # =================================================================== if not self._open_ports(): self._finalize_session('error') return False # =================================================================== # 3. Set execution state # =================================================================== self.is_running = True self.is_paused = False self.pause_queued = False self.stop_queued = False self.current_command_index = 0 # Emit session started self.session_started.emit(self.current_session_id) self.status_changed.emit(f"Session started: {self.session_name}") # =================================================================== # 4. Execute command loop # =================================================================== self._execute_command_loop() return True except Exception as e: self.error_occurred.emit(f"Failed to start session: {str(e)}") self._finalize_session('error') return False def _execute_command_loop(self): """ Execute all commands in sequence. This is the main execution loop that: 1. Iterates through all commands 2. Calls run.py for each command 3. Handles delays with countdown 4. Checks pause/stop queue between runs 5. Updates database and emits signals CRITICAL: Pause/Stop are QUEUED and only execute between runs during the delay phase, never during a run itself. """ try: # Loop through all commands for cmd_index, cmd in enumerate(self.commands, 1): self.current_command_index = cmd_index # =============================================================== # 1. Check if stop was queued (before starting new run) # =============================================================== if self.stop_queued: self.status_changed.emit("Session stopped by user") self._finalize_session('aborted') return # =============================================================== # 2. Emit command started # =============================================================== command_name = cmd['command_name'] self.command_started.emit(cmd_index, command_name) self.status_changed.emit(f"Command {cmd_index}/{self.total_commands}: {command_name}") # =============================================================== # 3. Execute run via run.py # =============================================================== status, packet_count, error_msg = execute_run( db_connection=self.db_conn, session_id=self.current_session_id, session_name=self.session_name, run_no=cmd_index, command_id=cmd['command_id'], command_hex=cmd['hex_string'], uart_command_port=self.uart_command_port, uart_logger_port=self.uart_logger_port, i2c_port=self.i2c_handle, # Note: run.py calls it i2c_port but it's actually I2CHandle packet_config=self.packet_config, stop_timeout_ms=5000, raw_data_callback=lambda direction, hex_str: self.raw_data_received.emit(direction, hex_str) ) # =============================================================== # 4. Handle run result # =============================================================== if status == "error": # Run failed - abort session self.error_occurred.emit(f"Run {cmd_index} failed: {error_msg}") self._finalize_session('error') return # Run succeeded - emit completion self.run_completed.emit(cmd_index, packet_count) self.status_changed.emit(f"Run {cmd_index} complete: {packet_count} packets") # Update total runs in database self.db_conn.execute(""" UPDATE sessions SET total_runs = ? WHERE session_id = ? """, (cmd_index, self.current_session_id)) self.db_conn.commit() # =============================================================== # 5. Delay between commands (with countdown and queue check) # =============================================================== # Get delay from command (default 3000ms if not specified) delay_ms = cmd.get('delay_ms', 3000) # Only delay if not last command if cmd_index < self.total_commands: self._execute_delay(delay_ms) # Check if pause/stop was queued during delay if self.pause_queued: self._finalize_session('paused') self.is_paused = True self.session_paused.emit() self.status_changed.emit("Session paused") return if self.stop_queued: self._finalize_session('aborted') self.status_changed.emit("Session stopped by user") return # =================================================================== # 6. All commands completed successfully # =================================================================== self._finalize_session('completed') self.session_finished.emit() self.status_changed.emit("Session completed successfully") except Exception as e: self.error_occurred.emit(f"Exception during session: {str(e)}") self._finalize_session('error') def _execute_delay(self, delay_ms: int): """ Execute delay with countdown. Emits delay_countdown signal every second so GUI can display countdown timer. Also checks pause/stop queue each second. Args: delay_ms: Delay in milliseconds """ # Convert to seconds delay_sec = delay_ms / 1000.0 # Countdown in 1-second steps for remaining in range(int(delay_sec), 0, -1): # Emit countdown self.delay_countdown.emit(remaining) self.status_changed.emit(f"Waiting... ({remaining}s remaining)") # Sleep for 1 second time.sleep(1.0) # Check if pause/stop was queued if self.pause_queued or self.stop_queued: return # Exit delay early # Handle fractional second at end fractional = delay_sec - int(delay_sec) if fractional > 0: time.sleep(fractional) def _finalize_session(self, status: str): """ Finalize session and cleanup. Args: status: 'completed', 'paused', 'aborted', or 'error' """ try: # Update session status in database self.db_conn.execute(""" UPDATE sessions SET status = ?, ended_at = datetime('now') WHERE session_id = ? """, (status, self.current_session_id)) self.db_conn.commit() # Close hardware ports self._close_ports() # Reset execution state (unless paused) if status != 'paused': self.is_running = False self.current_session_id = None except Exception as e: self.error_occurred.emit(f"Error finalizing session: {str(e)}") # ========================================================================= # PAUSE/RESUME/STOP CONTROL # ========================================================================= def pause_session(self): """ Queue pause request. CRITICAL: Pause is QUEUED and executes AFTER current run completes. User can press pause anytime, but it only takes effect during the delay phase between runs. """ if not self.is_running or self.is_paused: return self.pause_queued = True self.status_changed.emit("Pause queued (will execute after current run)") def resume_session(self): """ Resume paused session. Continues execution from where it left off, immediately proceeding to the next command without delay. """ if not self.is_paused: return # Clear pause flags self.is_paused = False self.pause_queued = False # Update database self.db_conn.execute(""" UPDATE sessions SET status = 'active' WHERE session_id = ? """, (self.current_session_id,)) self.db_conn.commit() # Reopen ports if not self._open_ports(): self.error_occurred.emit("Failed to reopen ports") return # Resume command loop from where we left off self.status_changed.emit("Session resumed") self._execute_command_loop() def stop_session(self): """ Queue stop request. CRITICAL: Stop is QUEUED and executes AFTER current run completes. User can press stop anytime, but it only takes effect during the delay phase between runs. """ if not self.is_running: return self.stop_queued = True self.status_changed.emit("Stop queued (will execute after current run)") # ========================================================================= # STATUS QUERIES # ========================================================================= def get_current_session_id(self) -> Optional[str]: """Get current session ID.""" return self.current_session_id def get_session_name(self) -> Optional[str]: """Get current session name.""" return self.session_name def is_session_running(self) -> bool: """Check if session is running.""" return self.is_running def is_session_paused(self) -> bool: """Check if session is paused.""" return self.is_paused # ============================================================================= # MAIN (for testing) # ============================================================================= if __name__ == "__main__": print("Session Module - vzug-e-hinge") print("=" * 60) print() print("This module orchestrates automated test sessions.") print() print("Features:") print(" ✓ Profile loading (interface + session)") print(" ✓ Automatic port management") print(" ✓ Command sequence execution") print(" ✓ Queued pause/stop (executes after current run)") print(" ✓ Real-time status updates via signals") print(" ✓ Comprehensive error handling") print() print("Usage:") print(" from session import Session") print(" session = Session(db_manager)") print(" session.load_session(interface_id, session_id)") print(" session.start_session()") print() print("Ready to be used by session_widget.py!")