You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
997 lines
41 KiB
997 lines
41 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Session Module - vzug-e-hinge
|
|
==============================
|
|
Orchestrates session execution with queued pause/stop behavior.
|
|
|
|
This module coordinates the entire automated test session flow:
|
|
- Loading interface and session profiles from database
|
|
- Opening UART/I2C ports based on configuration
|
|
- Executing command sequences via run.py
|
|
- Managing pause/stop requests (queued after current run)
|
|
- Providing real-time status updates via Qt signals
|
|
- Handling session lifecycle (create, pause, resume, abort)
|
|
|
|
Key Features:
|
|
- Queued pause/stop (executes AFTER current run completes)
|
|
- Countdown during delays with second-by-second updates
|
|
- Real-time timestamp correlation (UART/I2C)
|
|
- Automatic port management
|
|
- Comprehensive error handling
|
|
- Database integration with status tracking
|
|
|
|
Author: Kynsight
|
|
Version: 2.0.0
|
|
Date: 2025-11-09
|
|
"""
|
|
|
|
# =============================================================================
|
|
# IMPORTS
|
|
# =============================================================================
|
|
|
|
from PyQt6.QtCore import QObject, pyqtSignal
|
|
from typing import Optional, Dict, List, Any
|
|
import sqlite3
|
|
import json
|
|
import time
|
|
from datetime import datetime
|
|
|
|
# UART core
|
|
from uart.uart_kit.uart_core import (
|
|
UARTPort,
|
|
UARTConfig,
|
|
PacketConfig,
|
|
uart_create,
|
|
uart_open,
|
|
uart_close,
|
|
uart_start_reader,
|
|
uart_stop_reader,
|
|
Status as UARTStatus
|
|
)
|
|
|
|
# I2C core
|
|
from i2c.i2c_kit.i2c_core import (
|
|
I2CHandle,
|
|
I2CConfig,
|
|
i2c_create,
|
|
i2c_open,
|
|
i2c_close,
|
|
Status as I2CStatus
|
|
)
|
|
|
|
# Run executor
|
|
from run import execute_run
|
|
|
|
# Database manager
|
|
from database.init_database import DatabaseManager
|
|
|
|
|
|
# =============================================================================
|
|
# SESSION CLASS
|
|
# =============================================================================
|
|
|
|
class Session(QObject):
|
|
"""
|
|
Session orchestration class.
|
|
|
|
Manages the complete lifecycle of an automated test session:
|
|
1. Load profiles (interface + session) from database
|
|
2. Open hardware ports (UART + I2C)
|
|
3. Execute command sequence
|
|
4. Handle pause/stop requests (queued)
|
|
5. Provide real-time status updates
|
|
6. Close ports and finalize session
|
|
|
|
Signals:
|
|
session_started: Emitted when session starts (session_id)
|
|
command_started: Emitted when command starts (command_no, command_name)
|
|
run_completed: Emitted when run completes (run_no, packet_count)
|
|
delay_countdown: Emitted during delay countdown (seconds_remaining)
|
|
session_paused: Emitted when session pauses
|
|
session_finished: Emitted when session completes successfully
|
|
error_occurred: Emitted on error (error_message)
|
|
status_changed: Emitted on status change (status_text)
|
|
"""
|
|
|
|
# =========================================================================
|
|
# SIGNALS (for GUI updates)
|
|
# =========================================================================
|
|
|
|
session_started = pyqtSignal(str) # session_id
|
|
command_started = pyqtSignal(int, str) # command_no, command_name
|
|
run_completed = pyqtSignal(int, int) # run_no, packet_count
|
|
delay_countdown = pyqtSignal(int) # seconds_remaining
|
|
session_paused = pyqtSignal()
|
|
session_finished = pyqtSignal()
|
|
error_occurred = pyqtSignal(str) # error_message
|
|
status_changed = pyqtSignal(str) # status_text
|
|
raw_data_received = pyqtSignal(str, str) # direction (TX/RX), hex_string
|
|
|
|
# =========================================================================
|
|
# CONSTRUCTOR
|
|
# =========================================================================
|
|
|
|
def __init__(self, db_manager: DatabaseManager):
|
|
"""
|
|
Initialize session manager.
|
|
|
|
Args:
|
|
db_manager: Database manager instance
|
|
"""
|
|
super().__init__()
|
|
|
|
# Database connection
|
|
self.db_manager = db_manager
|
|
self.db_conn = db_manager.get_connection()
|
|
|
|
# Session state
|
|
self.current_session_id: Optional[str] = None
|
|
self.session_name: Optional[str] = None
|
|
self.interface_profile_id: Optional[int] = None
|
|
self.session_profile_id: Optional[int] = None
|
|
|
|
# Hardware ports (managed by this class)
|
|
self.uart_command_port: Optional[UARTPort] = None # For sending commands (TX/RX)
|
|
self.uart_logger_port: Optional[UARTPort] = None # For telemetry packets (RX only, optional)
|
|
self.i2c_handle: Optional[I2CHandle] = None
|
|
self.packet_config: Optional[PacketConfig] = None
|
|
|
|
# Phase execution (multi-phase support: Init, Execute, De-init)
|
|
self.phases: List[Dict[str, Any]] = [] # List of phase configs
|
|
self.total_commands: int = 0 # Total commands across all phases
|
|
self.current_command_index: int = 0 # Global command counter
|
|
|
|
# Execution control flags
|
|
self.is_running: bool = False
|
|
self.is_paused: bool = False
|
|
self.pause_queued: bool = False
|
|
self.stop_queued: bool = False
|
|
|
|
# =========================================================================
|
|
# PROFILE LOADING
|
|
# =========================================================================
|
|
|
|
def load_session(
|
|
self,
|
|
interface_profile_id: int,
|
|
init_session_id: Optional[int] = None,
|
|
execute_session_id: Optional[int] = None,
|
|
deinit_session_id: Optional[int] = None,
|
|
session_name: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Load session and interface profiles from database with 3 phases.
|
|
|
|
This method:
|
|
1. Reads interface_profile (UART/I2C config, packet detection)
|
|
2. Reads up to 3 session_profiles (Init, Execute, De-init)
|
|
3. Parses command sequences for each phase
|
|
4. Validates all commands exist in uart_commands table
|
|
5. Stores configuration for execution
|
|
|
|
Args:
|
|
interface_profile_id: ID from interface_profiles table
|
|
init_session_id: ID for Init phase (optional, None to skip)
|
|
execute_session_id: ID for Execute phase (optional, None to skip)
|
|
deinit_session_id: ID for De-init phase (optional, None to skip)
|
|
session_name: Custom session name (auto-generated if None)
|
|
|
|
Returns:
|
|
True if successful, False on error
|
|
"""
|
|
try:
|
|
# ===================================================================
|
|
# 1. Load interface profile (UART/I2C configuration)
|
|
# ===================================================================
|
|
|
|
cursor = self.db_conn.execute("""
|
|
SELECT
|
|
profile_name,
|
|
uart_command_port, uart_command_baud, uart_command_data_bits,
|
|
uart_command_stop_bits, uart_command_parity, uart_command_timeout_ms,
|
|
uart_logger_enable, uart_logger_port, uart_logger_baud, uart_logger_data_bits,
|
|
uart_logger_stop_bits, uart_logger_parity, uart_logger_timeout_ms,
|
|
uart_logger_grace_ms,
|
|
uart_logger_packet_detect_enable, uart_logger_packet_detect_start,
|
|
uart_logger_packet_detect_length, uart_logger_packet_detect_end,
|
|
i2c_port, i2c_slave_address, i2c_slave_read_register, i2c_slave_read_length
|
|
FROM interface_profiles
|
|
WHERE profile_id = ?
|
|
""", (interface_profile_id,))
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
self.error_occurred.emit(f"Interface profile {interface_profile_id} not found")
|
|
return False
|
|
|
|
# Store interface configuration
|
|
self.interface_profile_id = interface_profile_id
|
|
self.interface_config = {
|
|
'profile_name': row[0],
|
|
# UART Command interface
|
|
'uart_command_port': row[1],
|
|
'uart_command_baud': row[2],
|
|
'uart_command_data_bits': row[3],
|
|
'uart_command_stop_bits': row[4],
|
|
'uart_command_parity': row[5],
|
|
'uart_command_timeout_ms': row[6],
|
|
# UART Logger interface
|
|
'uart_logger_enable': bool(row[7]) if row[7] is not None else True,
|
|
'uart_logger_port': row[8],
|
|
'uart_logger_baud': row[9],
|
|
'uart_logger_data_bits': row[10],
|
|
'uart_logger_stop_bits': row[11],
|
|
'uart_logger_parity': row[12],
|
|
'uart_logger_timeout_ms': row[13],
|
|
'uart_logger_grace_ms': row[14],
|
|
# Packet detection
|
|
'packet_detect_enable': row[15],
|
|
'packet_detect_start': row[16],
|
|
'packet_detect_length': row[17],
|
|
'packet_detect_end': row[18],
|
|
# I2C configuration
|
|
'i2c_port': row[19],
|
|
'i2c_slave_address': row[20],
|
|
'i2c_slave_read_register': row[21],
|
|
'i2c_slave_read_length': row[22]
|
|
}
|
|
|
|
# ===================================================================
|
|
# 2. Load session profiles (up to 3 phases: Init, Execute, De-init)
|
|
# ===================================================================
|
|
|
|
# Storage for phase configurations
|
|
self.phases = [] # List of dicts: {'name': str, 'commands': list, 'profile_id': int}
|
|
self.total_commands = 0
|
|
|
|
# Load Init phase
|
|
if init_session_id is not None:
|
|
success, phase_config = self._load_phase_profile(init_session_id, "Init")
|
|
if not success:
|
|
return False
|
|
if phase_config:
|
|
self.phases.append(phase_config)
|
|
self.total_commands += len(phase_config['commands'])
|
|
|
|
# Load Execute phase
|
|
if execute_session_id is not None:
|
|
success, phase_config = self._load_phase_profile(execute_session_id, "Execute")
|
|
if not success:
|
|
return False
|
|
if phase_config:
|
|
self.phases.append(phase_config)
|
|
self.total_commands += len(phase_config['commands'])
|
|
|
|
# Load De-init phase
|
|
if deinit_session_id is not None:
|
|
success, phase_config = self._load_phase_profile(deinit_session_id, "De-init")
|
|
if not success:
|
|
return False
|
|
if phase_config:
|
|
self.phases.append(phase_config)
|
|
self.total_commands += len(phase_config['commands'])
|
|
|
|
# Check at least one phase loaded
|
|
if len(self.phases) == 0:
|
|
self.error_occurred.emit("No session phases selected")
|
|
return False
|
|
|
|
# ===================================================================
|
|
# 3. Generate session name if not provided
|
|
# ===================================================================
|
|
|
|
if session_name is None or session_name.strip() == "":
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
phase_names = "_".join([p['name'] for p in self.phases])
|
|
self.session_name = f"{phase_names}_{timestamp}"
|
|
else:
|
|
self.session_name = session_name
|
|
|
|
# ===================================================================
|
|
# 4. Emit success status
|
|
# ===================================================================
|
|
|
|
phase_summary = ", ".join([f"{p['name']}({len(p['commands'])} cmds)" for p in self.phases])
|
|
self.status_changed.emit(f"Multi-phase session loaded: {self.session_name}")
|
|
self.status_changed.emit(f"Interface: {self.interface_config['profile_name']}")
|
|
self.status_changed.emit(f"Phases: {phase_summary}")
|
|
self.status_changed.emit(f"Total commands: {self.total_commands}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"Failed to load session: {str(e)}")
|
|
return False
|
|
|
|
def _load_phase_profile(self, profile_id: int, phase_name: str) -> tuple[bool, Optional[dict]]:
|
|
"""
|
|
Load a single session profile for a phase.
|
|
|
|
Args:
|
|
profile_id: Session profile ID
|
|
phase_name: Phase name ("Init", "Execute", or "De-init")
|
|
|
|
Returns:
|
|
(success, phase_config) where phase_config is:
|
|
{'name': str, 'commands': list, 'profile_id': int, 'profile_name': str}
|
|
or None if profile has no commands
|
|
"""
|
|
try:
|
|
cursor = self.db_conn.execute("""
|
|
SELECT profile_name, command_sequence, description, print_command_rx
|
|
FROM session_profiles
|
|
WHERE profile_id = ?
|
|
""", (profile_id,))
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
self.error_occurred.emit(f"{phase_name} session profile {profile_id} not found")
|
|
return (False, None)
|
|
|
|
profile_name = row[0]
|
|
command_sequence_json = row[1]
|
|
print_command_rx = bool(row[3]) if len(row) > 3 else False
|
|
|
|
# Parse JSON command sequence
|
|
try:
|
|
command_sequence = json.loads(command_sequence_json)
|
|
commands = command_sequence.get('commands', [])
|
|
except json.JSONDecodeError as e:
|
|
self.error_occurred.emit(f"{phase_name}: Invalid JSON - {str(e)}")
|
|
return (False, None)
|
|
|
|
# Empty profile is okay (skip phase)
|
|
if len(commands) == 0:
|
|
self.status_changed.emit(f"{phase_name} phase has no commands, skipping")
|
|
return (True, None)
|
|
|
|
# Validate all commands exist in database
|
|
for cmd in commands:
|
|
cmd_id = cmd.get('command_id')
|
|
if not cmd_id:
|
|
self.error_occurred.emit(f"{phase_name}: Command missing command_id")
|
|
return (False, None)
|
|
|
|
# Check if command exists and load details
|
|
cursor = self.db_conn.execute("""
|
|
SELECT command_name, hex_string
|
|
FROM uart_commands
|
|
WHERE command_id = ?
|
|
""", (cmd_id,))
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
self.error_occurred.emit(f"{phase_name}: UART command {cmd_id} not found")
|
|
return (False, None)
|
|
|
|
# Store command details
|
|
cmd['command_name'] = row[0]
|
|
cmd['hex_string'] = row[1]
|
|
|
|
# Create phase config
|
|
phase_config = {
|
|
'name': phase_name,
|
|
'profile_name': profile_name,
|
|
'profile_id': profile_id,
|
|
'commands': commands,
|
|
'print_command_rx': print_command_rx
|
|
}
|
|
|
|
return (True, phase_config)
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"{phase_name}: Failed to load - {str(e)}")
|
|
return (False, None)
|
|
|
|
# =========================================================================
|
|
# PORT MANAGEMENT
|
|
# =========================================================================
|
|
|
|
def _open_ports(self) -> bool:
|
|
"""
|
|
Open UART and I2C ports based on interface configuration.
|
|
|
|
This method:
|
|
1. Opens UART Command port (TX/RX - for sending commands, always needed)
|
|
2. Opens UART Logger port (RX - for telemetry, optional)
|
|
3. Opens I2C port (for angle readings)
|
|
4. Creates PacketConfig from interface profile
|
|
|
|
Returns:
|
|
True if successful, False on error
|
|
"""
|
|
try:
|
|
# ===================================================================
|
|
# 1. Open UART Command Port (TX/RX - ALWAYS NEEDED)
|
|
# ===================================================================
|
|
|
|
# Create UART command config
|
|
cmd_uart_config = UARTConfig(
|
|
device=self.interface_config['uart_command_port'],
|
|
baudrate=self.interface_config['uart_command_baud'],
|
|
data_bits=self.interface_config['uart_command_data_bits'],
|
|
stop_bits=self.interface_config['uart_command_stop_bits'],
|
|
parity=self.interface_config['uart_command_parity'],
|
|
buffer_size=40 * 1024 * 1024, # 40MB buffer
|
|
stop_timeout_ms=self.interface_config['uart_command_timeout_ms']
|
|
)
|
|
|
|
# Create UART command port
|
|
status, self.uart_command_port = uart_create(cmd_uart_config)
|
|
|
|
if status != UARTStatus.OK or self.uart_command_port is None:
|
|
self.error_occurred.emit(f"Failed to create UART command port")
|
|
return False
|
|
|
|
# Open UART command port
|
|
status = uart_open(self.uart_command_port)
|
|
|
|
if status != UARTStatus.OK:
|
|
self.error_occurred.emit(f"Failed to open UART command port {cmd_uart_config.device}")
|
|
return False
|
|
|
|
# Start UART command reader thread
|
|
status = uart_start_reader(self.uart_command_port)
|
|
if status != UARTStatus.OK:
|
|
uart_close(self.uart_command_port)
|
|
self.uart_command_port = None
|
|
self.error_occurred.emit("Failed to start UART command reader")
|
|
return False
|
|
|
|
self.status_changed.emit(f"UART Command opened: {cmd_uart_config.device}")
|
|
|
|
# ===================================================================
|
|
# 2. Open UART Logger Port (RX - OPTIONAL)
|
|
# ===================================================================
|
|
|
|
if self.interface_config['uart_logger_enable']:
|
|
# Logger enabled - open logger port
|
|
|
|
# Create UART logger config
|
|
log_uart_config = UARTConfig(
|
|
device=self.interface_config['uart_logger_port'],
|
|
baudrate=self.interface_config['uart_logger_baud'],
|
|
data_bits=self.interface_config['uart_logger_data_bits'],
|
|
stop_bits=self.interface_config['uart_logger_stop_bits'],
|
|
parity=self.interface_config['uart_logger_parity'],
|
|
buffer_size=40 * 1024 * 1024, # 40MB buffer
|
|
stop_timeout_ms=self.interface_config['uart_logger_timeout_ms'],
|
|
grace_timeout_ms=self.interface_config['uart_logger_grace_ms'],
|
|
polling_mode=True # Enable grace period for first byte
|
|
)
|
|
|
|
# Create UART logger port
|
|
status, self.uart_logger_port = uart_create(log_uart_config)
|
|
|
|
if status != UARTStatus.OK or self.uart_logger_port is None:
|
|
self.error_occurred.emit(f"Failed to create UART logger port")
|
|
# Close command port before returning
|
|
uart_stop_reader(self.uart_command_port)
|
|
uart_close(self.uart_command_port)
|
|
self.uart_command_port = None
|
|
return False
|
|
|
|
# Open UART logger port
|
|
status = uart_open(self.uart_logger_port)
|
|
|
|
if status != UARTStatus.OK:
|
|
self.error_occurred.emit(f"Failed to open UART logger port {log_uart_config.device}")
|
|
# Close command port before returning
|
|
uart_stop_reader(self.uart_command_port)
|
|
uart_close(self.uart_command_port)
|
|
self.uart_command_port = None
|
|
return False
|
|
|
|
# Start UART logger reader thread
|
|
status = uart_start_reader(self.uart_logger_port)
|
|
if status != UARTStatus.OK:
|
|
uart_close(self.uart_logger_port)
|
|
self.uart_logger_port = None
|
|
# Close command port before returning
|
|
uart_stop_reader(self.uart_command_port)
|
|
uart_close(self.uart_command_port)
|
|
self.uart_command_port = None
|
|
self.error_occurred.emit("Failed to start UART logger reader")
|
|
return False
|
|
|
|
self.status_changed.emit(f"UART Logger opened: {log_uart_config.device}")
|
|
else:
|
|
# Logger disabled - skip logger port
|
|
self.uart_logger_port = None
|
|
self.status_changed.emit("UART Logger disabled (command port only)")
|
|
|
|
# ===================================================================
|
|
# 3. Create PacketConfig from interface profile
|
|
# ===================================================================
|
|
|
|
# Check if ANY phase has print_command_rx enabled (session profile override)
|
|
any_print_rx = any(phase.get('print_command_rx', False) for phase in self.phases)
|
|
if any_print_rx:
|
|
# Force packet detection OFF - just print TX/RX to data monitor
|
|
self.packet_config = PacketConfig(enable=False)
|
|
elif not self.interface_config['uart_logger_enable']:
|
|
# Logger disabled - no packet detection
|
|
self.packet_config = PacketConfig(enable=False)
|
|
elif self.interface_config['packet_detect_enable']:
|
|
# Parse hex markers
|
|
start_marker = bytes.fromhex(
|
|
self.interface_config['packet_detect_start'].replace(' ', '')
|
|
) if self.interface_config['packet_detect_start'] else None
|
|
|
|
end_marker = bytes.fromhex(
|
|
self.interface_config['packet_detect_end'].replace(' ', '')
|
|
) if self.interface_config['packet_detect_end'] else None
|
|
|
|
self.packet_config = PacketConfig(
|
|
enable=True,
|
|
start_marker=start_marker,
|
|
packet_length=self.interface_config['packet_detect_length'],
|
|
end_marker=end_marker,
|
|
on_packet_callback=None # Will be set by run.py
|
|
)
|
|
else:
|
|
self.packet_config = PacketConfig(enable=False)
|
|
|
|
# ===================================================================
|
|
# 4. Open I2C Port (optional - for angle readings)
|
|
# ===================================================================
|
|
|
|
if self.interface_config['i2c_port']:
|
|
# Parse I2C address
|
|
|
|
# Create I2C config
|
|
i2c_config = I2CConfig(
|
|
bus_id=int(self.interface_config["i2c_port"]),
|
|
buffer_size=40 * 1024 * 1024 # 40MB buffer
|
|
)
|
|
|
|
# Create I2C handle
|
|
status, self.i2c_handle = i2c_create(i2c_config)
|
|
|
|
if status != I2CStatus.OK or self.i2c_handle is None:
|
|
# I2C is optional, just warn
|
|
self.status_changed.emit("Warning: Could not create I2C handle")
|
|
self.i2c_handle = None
|
|
else:
|
|
# Open I2C
|
|
status = i2c_open(self.i2c_handle)
|
|
|
|
if status != I2CStatus.OK:
|
|
# I2C is optional, just warn
|
|
self.status_changed.emit("Warning: I2C port not available")
|
|
self.i2c_handle = None
|
|
else:
|
|
self.status_changed.emit(f"I2C opened: bus {self.interface_config['i2c_port']}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"Failed to open ports: {str(e)}")
|
|
self._close_ports()
|
|
return False
|
|
|
|
def _close_ports(self):
|
|
"""
|
|
Close UART and I2C ports.
|
|
|
|
Called at end of session or on error.
|
|
Ensures clean shutdown of hardware interfaces.
|
|
"""
|
|
try:
|
|
# Close UART Command port
|
|
if self.uart_command_port:
|
|
uart_stop_reader(self.uart_command_port)
|
|
uart_close(self.uart_command_port)
|
|
self.uart_command_port = None
|
|
self.status_changed.emit("UART Command closed")
|
|
|
|
# Close UART Logger port
|
|
if self.uart_logger_port:
|
|
uart_stop_reader(self.uart_logger_port)
|
|
uart_close(self.uart_logger_port)
|
|
self.uart_logger_port = None
|
|
self.status_changed.emit("UART Logger closed")
|
|
|
|
# Close I2C
|
|
if self.i2c_handle:
|
|
i2c_close(self.i2c_handle)
|
|
self.i2c_handle = None
|
|
self.status_changed.emit("I2C closed")
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"Error closing ports: {str(e)}")
|
|
|
|
# =========================================================================
|
|
# SESSION EXECUTION
|
|
# =========================================================================
|
|
|
|
def start_session(self) -> bool:
|
|
"""
|
|
Start session execution.
|
|
|
|
This method:
|
|
1. Creates session record in database
|
|
2. Opens hardware ports
|
|
3. Executes command loop
|
|
4. Handles delays with countdown
|
|
5. Checks pause/stop queue between runs
|
|
6. Finalizes session on completion
|
|
|
|
Returns:
|
|
True if started successfully, False on error
|
|
"""
|
|
if self.is_running:
|
|
self.error_occurred.emit("Session already running")
|
|
return False
|
|
|
|
if not hasattr(self, 'phases') or len(self.phases) == 0:
|
|
self.error_occurred.emit("No session loaded")
|
|
return False
|
|
|
|
try:
|
|
# ===================================================================
|
|
# 1. Create session record in database
|
|
# ===================================================================
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
self.current_session_id = f"session_{timestamp}"
|
|
session_date = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
# NOTE: Database schema currently supports single session_profile_id
|
|
# We use the first phase's profile_id as the primary reference
|
|
# All phases and their profiles are tracked in self.phases list
|
|
first_phase_profile_id = self.phases[0]['profile_id'] if len(self.phases) > 0 else None
|
|
|
|
self.db_conn.execute("""
|
|
INSERT INTO sessions (
|
|
session_id, session_name, session_date,
|
|
interface_profile_id, session_profile_id,
|
|
status, total_runs
|
|
) VALUES (?, ?, ?, ?, ?, 'active', 0)
|
|
""", (
|
|
self.current_session_id,
|
|
self.session_name,
|
|
session_date,
|
|
self.interface_profile_id,
|
|
first_phase_profile_id
|
|
))
|
|
|
|
self.db_conn.commit()
|
|
|
|
# ===================================================================
|
|
# 2. Open hardware ports
|
|
# ===================================================================
|
|
|
|
if not self._open_ports():
|
|
self._finalize_session('error')
|
|
return False
|
|
|
|
# ===================================================================
|
|
# 3. Set execution state
|
|
# ===================================================================
|
|
|
|
self.is_running = True
|
|
self.is_paused = False
|
|
self.pause_queued = False
|
|
self.stop_queued = False
|
|
self.current_command_index = 0
|
|
|
|
# Emit session started
|
|
self.session_started.emit(self.current_session_id)
|
|
self.status_changed.emit(f"Session started: {self.session_name}")
|
|
|
|
# ===================================================================
|
|
# 4. Execute command loop
|
|
# ===================================================================
|
|
|
|
self._execute_command_loop()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"Failed to start session: {str(e)}")
|
|
self._finalize_session('error')
|
|
return False
|
|
|
|
def _execute_command_loop(self):
|
|
"""
|
|
Execute all commands in sequence across multiple phases.
|
|
|
|
This is the main execution loop that:
|
|
1. Iterates through all phases (Init, Execute, De-init)
|
|
2. Iterates through commands within each phase
|
|
3. Calls run.py for each command
|
|
4. Handles delays with countdown
|
|
5. Checks pause/stop queue between runs
|
|
6. Updates database and emits signals
|
|
|
|
CRITICAL: Pause/Stop are QUEUED and only execute between runs
|
|
during the delay phase, never during a run itself. Stop cancels
|
|
ALL remaining phases.
|
|
"""
|
|
try:
|
|
global_cmd_index = 0 # Track overall command number
|
|
|
|
# Loop through phases
|
|
for phase_index, phase in enumerate(self.phases, 1):
|
|
phase_name = phase['name']
|
|
phase_commands = phase['commands']
|
|
total_phases = len(self.phases)
|
|
|
|
# Emit phase started
|
|
self.status_changed.emit(f"Starting Phase {phase_index}/{total_phases}: {phase_name}")
|
|
|
|
# Loop through commands in this phase
|
|
for cmd_index_in_phase, cmd in enumerate(phase_commands, 1):
|
|
global_cmd_index += 1
|
|
self.current_command_index = global_cmd_index
|
|
|
|
# ===============================================================
|
|
# 1. Check if stop was queued (before starting new run)
|
|
# ===============================================================
|
|
|
|
if self.stop_queued:
|
|
self.status_changed.emit("Session stopped by user")
|
|
self._finalize_session('aborted')
|
|
return
|
|
|
|
# ===============================================================
|
|
# 2. Emit command started
|
|
# ===============================================================
|
|
|
|
command_name = cmd['command_name']
|
|
self.command_started.emit(global_cmd_index, command_name)
|
|
self.status_changed.emit(
|
|
f"[{phase_name}] Command {cmd_index_in_phase}/{len(phase_commands)} "
|
|
f"(Total: {global_cmd_index}/{self.total_commands}): {command_name}"
|
|
)
|
|
|
|
# ===============================================================
|
|
# 3. Execute run via run.py
|
|
# ===============================================================
|
|
|
|
status, packet_count, error_msg = execute_run(
|
|
db_connection=self.db_conn,
|
|
session_id=self.current_session_id,
|
|
session_name=self.session_name,
|
|
run_no=global_cmd_index,
|
|
command_id=cmd['command_id'],
|
|
command_hex=cmd['hex_string'],
|
|
uart_command_port=self.uart_command_port,
|
|
uart_logger_port=self.uart_logger_port,
|
|
i2c_port=self.i2c_handle,
|
|
packet_config=self.packet_config,
|
|
i2c_address=int(self.interface_config['i2c_slave_address'], 16) if self.interface_config.get('i2c_slave_address') else 0x40,
|
|
i2c_register=int(self.interface_config['i2c_slave_read_register'], 16) if self.interface_config.get('i2c_slave_read_register') else 0xFE,
|
|
stop_timeout_ms=self.interface_config['uart_logger_timeout_ms'],
|
|
grace_timeout_ms=self.interface_config['uart_logger_grace_ms'],
|
|
raw_data_callback=lambda direction, hex_str: self.raw_data_received.emit(direction, hex_str)
|
|
)
|
|
|
|
# ===============================================================
|
|
# 4. Handle run result
|
|
# ===============================================================
|
|
|
|
if status == "error":
|
|
# Run failed - abort session (all phases)
|
|
self.error_occurred.emit(f"[{phase_name}] Run {global_cmd_index} failed: {error_msg}")
|
|
self._finalize_session('error')
|
|
return
|
|
|
|
# Run succeeded - emit completion
|
|
self.run_completed.emit(global_cmd_index, packet_count)
|
|
self.status_changed.emit(f"[{phase_name}] Run {global_cmd_index} complete: {packet_count} packets")
|
|
|
|
# Update total runs in database
|
|
self.db_conn.execute("""
|
|
UPDATE sessions SET total_runs = ? WHERE session_id = ?
|
|
""", (global_cmd_index, self.current_session_id))
|
|
self.db_conn.commit()
|
|
|
|
# ===============================================================
|
|
# 5. Delay between commands (with countdown and queue check)
|
|
# ===============================================================
|
|
|
|
# Get delay from command (default 3000ms if not specified)
|
|
delay_ms = cmd.get('delay_ms', 3000)
|
|
|
|
# Only delay if not last command overall
|
|
if global_cmd_index < self.total_commands:
|
|
self._execute_delay(delay_ms)
|
|
|
|
# Check if pause/stop was queued during delay
|
|
if self.pause_queued:
|
|
self._finalize_session('paused')
|
|
self.is_paused = True
|
|
self.session_paused.emit()
|
|
self.status_changed.emit("Session paused")
|
|
return
|
|
|
|
if self.stop_queued:
|
|
self._finalize_session('aborted')
|
|
self.status_changed.emit("Session stopped by user")
|
|
return
|
|
|
|
# Phase completed
|
|
self.status_changed.emit(f"Phase {phase_index}/{total_phases} completed: {phase_name}")
|
|
|
|
# ===================================================================
|
|
# 6. All phases completed successfully
|
|
# ===================================================================
|
|
|
|
self._finalize_session('completed')
|
|
self.session_finished.emit()
|
|
phase_list = ", ".join([p['name'] for p in self.phases])
|
|
self.status_changed.emit(f"All phases completed successfully: {phase_list}")
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"Exception during session: {str(e)}")
|
|
self._finalize_session('error')
|
|
|
|
def _execute_delay(self, delay_ms: int):
|
|
"""
|
|
Execute delay with countdown.
|
|
|
|
Emits delay_countdown signal every second so GUI can display
|
|
countdown timer. Also checks pause/stop queue each second.
|
|
|
|
Args:
|
|
delay_ms: Delay in milliseconds
|
|
"""
|
|
# Convert to seconds
|
|
delay_sec = delay_ms / 1000.0
|
|
|
|
# Countdown in 1-second steps
|
|
for remaining in range(int(delay_sec), 0, -1):
|
|
# Emit countdown
|
|
self.delay_countdown.emit(remaining)
|
|
self.status_changed.emit(f"Waiting... ({remaining}s remaining)")
|
|
|
|
# Sleep for 1 second
|
|
time.sleep(1.0)
|
|
|
|
# Check if pause/stop was queued
|
|
if self.pause_queued or self.stop_queued:
|
|
return # Exit delay early
|
|
|
|
# Handle fractional second at end
|
|
fractional = delay_sec - int(delay_sec)
|
|
if fractional > 0:
|
|
time.sleep(fractional)
|
|
|
|
def _finalize_session(self, status: str):
|
|
"""
|
|
Finalize session and cleanup.
|
|
|
|
Args:
|
|
status: 'completed', 'paused', 'aborted', or 'error'
|
|
"""
|
|
try:
|
|
# Update session status in database
|
|
self.db_conn.execute("""
|
|
UPDATE sessions
|
|
SET status = ?, ended_at = datetime('now')
|
|
WHERE session_id = ?
|
|
""", (status, self.current_session_id))
|
|
|
|
self.db_conn.commit()
|
|
|
|
# Close hardware ports
|
|
self._close_ports()
|
|
|
|
# Reset execution state (unless paused)
|
|
if status != 'paused':
|
|
self.is_running = False
|
|
self.current_session_id = None
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"Error finalizing session: {str(e)}")
|
|
|
|
# =========================================================================
|
|
# PAUSE/RESUME/STOP CONTROL
|
|
# =========================================================================
|
|
|
|
def pause_session(self):
|
|
"""
|
|
Queue pause request.
|
|
|
|
CRITICAL: Pause is QUEUED and executes AFTER current run completes.
|
|
User can press pause anytime, but it only takes effect during
|
|
the delay phase between runs.
|
|
"""
|
|
if not self.is_running or self.is_paused:
|
|
return
|
|
|
|
self.pause_queued = True
|
|
self.status_changed.emit("Pause queued (will execute after current run)")
|
|
|
|
def resume_session(self):
|
|
"""
|
|
Resume paused session.
|
|
|
|
Continues execution from where it left off, immediately proceeding
|
|
to the next command without delay.
|
|
"""
|
|
if not self.is_paused:
|
|
return
|
|
|
|
# Clear pause flags
|
|
self.is_paused = False
|
|
self.pause_queued = False
|
|
|
|
# Update database
|
|
self.db_conn.execute("""
|
|
UPDATE sessions SET status = 'active' WHERE session_id = ?
|
|
""", (self.current_session_id,))
|
|
self.db_conn.commit()
|
|
|
|
# Reopen ports
|
|
if not self._open_ports():
|
|
self.error_occurred.emit("Failed to reopen ports")
|
|
return
|
|
|
|
# Resume command loop from where we left off
|
|
self.status_changed.emit("Session resumed")
|
|
self._execute_command_loop()
|
|
|
|
def stop_session(self):
|
|
"""
|
|
Queue stop request.
|
|
|
|
CRITICAL: Stop is QUEUED and executes AFTER current run completes.
|
|
User can press stop anytime, but it only takes effect during
|
|
the delay phase between runs.
|
|
"""
|
|
if not self.is_running:
|
|
return
|
|
|
|
self.stop_queued = True
|
|
self.status_changed.emit("Stop queued (will execute after current run)")
|
|
|
|
# =========================================================================
|
|
# STATUS QUERIES
|
|
# =========================================================================
|
|
|
|
def get_current_session_id(self) -> Optional[str]:
|
|
"""Get current session ID."""
|
|
return self.current_session_id
|
|
|
|
def get_session_name(self) -> Optional[str]:
|
|
"""Get current session name."""
|
|
return self.session_name
|
|
|
|
def is_session_running(self) -> bool:
|
|
"""Check if session is running."""
|
|
return self.is_running
|
|
|
|
def is_session_paused(self) -> bool:
|
|
"""Check if session is paused."""
|
|
return self.is_paused
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN (for testing)
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
print("Session Module - vzug-e-hinge")
|
|
print("=" * 60)
|
|
print()
|
|
print("This module orchestrates automated test sessions.")
|
|
print()
|
|
print("Features:")
|
|
print(" ✓ Profile loading (interface + session)")
|
|
print(" ✓ Automatic port management")
|
|
print(" ✓ Command sequence execution")
|
|
print(" ✓ Queued pause/stop (executes after current run)")
|
|
print(" ✓ Real-time status updates via signals")
|
|
print(" ✓ Comprehensive error handling")
|
|
print()
|
|
print("Usage:")
|
|
print(" from session import Session")
|
|
print(" session = Session(db_manager)")
|
|
print(" session.load_session(interface_id, session_id)")
|
|
print(" session.start_session()")
|
|
print()
|
|
print("Ready to be used by session_widget.py!")
|