#!/usr/bin/env python3 """ Run Module - vzug-e-hinge ========================== Executes a single RUN (one UART command with data collection). Flow: 1. Configure UART packet detection with callback 2. Callback triggers I2C read (real-time correlation) 3. Send UART command 4. Wait for stop condition 5. Decode packets (call decoder.py) 6. Save to database (telemetry_decoded + telemetry_raw) Author: Kynsight Version: 1.0.0 Date: 2025-11-09 """ import time from typing import Tuple, Optional, List import sqlite3 # Import global clock for timestamp synchronization from global_clock import GlobalClock, now_ns as global_now_ns # Import UART core from uart.uart_kit.uart_core import ( UARTPort, PacketConfig, PacketInfo, uart_write, uart_send_and_read_pgkomm2, uart_start_listening_with_packets, uart_stop_listening, uart_get_detected_packets, uart_get_packet_errors, uart_clear_detected_packets, uart_read_buffer, Status as UARTStatus ) # Import I2C core from i2c.i2c_kit.i2c_core import ( I2CHandle, i2c_read_block, Status as I2CStatus ) # Import buffer utilities from buffer_kit.circular_buffer import cb_fill_bytes, cb_capacity # Import decoder from decoder import decode_uart_packet, decode_i2c_sample # Import Kalman filter from kalman_filter import AngleKalmanFilter class RunExecutor: """ Executes a single RUN. A RUN consists of: - Send UART command OR execute I2C command - Collect UART packets (with timestamps) - Trigger I2C reads via callback (correlated timestamps) - Wait for stop condition - Decode all data - Save to database """ def __init__(self, db_connection: sqlite3.Connection): """ Initialize run executor. Args: db_connection: Database connection """ self.db_conn = db_connection self.i2c_readings = [] # Storage for I2C readings from callback self.i2c_failures = 0 # Counter for I2C read failures self.i2c_zero_reference = 0 # Absolute angle used as zero (0 = not zeroed) # For angular velocity/acceleration calculation self.prev_angle_deg = None self.prev_timestamp_ns = None self.prev_velocity_deg_s = None # Kalman filter for angle smoothing (reset per run) self.kalman_filter = None def execute_run( self, session_id: str, session_name: str, run_no: int, command_id: int, command_hex: str, uart_command_port: UARTPort, uart_logger_port: Optional[UARTPort], i2c_port: Optional[I2CHandle], packet_config: PacketConfig, i2c_address: int = 0x40, i2c_register: int = 0xFE, stop_timeout_ms: int = 5000, grace_timeout_ms: int = 1500, raw_data_callback = None ) -> Tuple[str, int, str]: """ Execute a single RUN. Args: session_id: Session ID session_name: Session name run_no: Run number (1, 2, 3, ...) command_id: UART command ID from database command_hex: Command hex string (e.g., "DD 22 50 48...") uart_command_port: UART command port (TX/RX for commands) uart_logger_port: UART logger port (RX for telemetry, None if disabled) i2c_port: I2C port (optional, for angle readings) packet_config: Packet detection configuration stop_timeout_ms: Maximum wait time for stop condition Returns: (status, packet_count, error_msg) - status: "success" or "error" - packet_count: Number of packets detected - error_msg: Error message if status="error", empty otherwise """ try: # Clear previous packets (only if logger port exists) if uart_logger_port: uart_clear_detected_packets(uart_logger_port) self.i2c_readings.clear() self.i2c_failures = 0 # Reset error counter # Reset velocity/acceleration tracking for new run self.prev_angle_deg = None self.prev_timestamp_ns = None self.prev_velocity_deg_s = None # Reset Kalman filter for new run self.kalman_filter = AngleKalmanFilter( process_noise=0.1, # Tunable: system dynamics uncertainty measurement_noise=0.022, # 14-bit encoder quantization initial_angle=0.0 # Will be set by first measurement ) # Record run start time (using global clock for consistency with packet timestamps) # packet_info.start_timestamp is in nanoseconds (UART core converts internally) run_start_ns = global_now_ns() # ================================================================ # 1. Configure packet detection with callback (LOGGER PORT) # ================================================================ if uart_logger_port and packet_config.enable: # Debug: Check if I2C is available if raw_data_callback: if i2c_port: raw_data_callback("INFO", f"I2C enabled: will trigger reads on packet detection") else: raw_data_callback("INFO", f"I2C disabled: no I2C port available") # Create callback for I2C triggering callback_count = [0] # Use list for mutable counter in nested function def on_uart_packet_detected(timestamp_ns: int): """ Called immediately when UART packet detected. Triggers I2C read for timestamp correlation. """ callback_count[0] += 1 if i2c_port: # Read I2C angle immediately status, i2c_bytes = i2c_read_block( i2c_port, i2c_address, # Device address from session config i2c_register, # Register address from session config 2 # Read 2 bytes ) if status == I2CStatus.OK: # Store with correlated timestamp self.i2c_readings.append({ 'timestamp_ns': timestamp_ns, 'i2c_bytes': i2c_bytes }) else: # I2C read failed - count the failure self.i2c_failures += 1 if callback_count[0] <= 3: print(f"[DEBUG] I2C read failed: {status}") # Create packet config with callback packet_config_with_callback = PacketConfig( enable=packet_config.enable, start_marker=packet_config.start_marker, packet_length=packet_config.packet_length, end_marker=packet_config.end_marker, on_packet_callback=on_uart_packet_detected if i2c_port else None ) # Debug: Verify callback is attached if raw_data_callback: has_callback = packet_config_with_callback.on_packet_callback is not None raw_data_callback("INFO", f"Packet config: callback={'attached' if has_callback else 'None'}") # Start listening with packet detection on LOGGER PORT status = uart_start_listening_with_packets(uart_logger_port, packet_config_with_callback) if status != UARTStatus.OK: return ("error", 0, "Failed to start UART packet detection") # ================================================================ # 2. Send UART command (COMMAND PORT) - Using PGKomm2 # ================================================================ # Parse hex string to bytes command_bytes = self._parse_hex_string(command_hex) if not command_bytes: if uart_logger_port: uart_stop_listening(uart_logger_port) return ("error", 0, f"Invalid command hex string: {command_hex}") # Emit TX data (command to be sent) if raw_data_callback: hex_tx = ' '.join(f'{b:02X}' for b in command_bytes) # Add ASCII (skip DD 22 magic bytes) ascii_data = command_bytes[2:] if len(command_bytes) >= 2 else command_bytes ascii_tx = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in ascii_data) raw_data_callback("TX", f"{hex_tx} | '{ascii_tx}'") # Send command via PGKomm2 (always use this mode for sessions) status, frames = uart_send_and_read_pgkomm2( uart_command_port, command_bytes, capture_max_ms=30, # Default PGKomm2 timeout log_callback=raw_data_callback # Pass callback for logging ) if status != UARTStatus.OK: if uart_logger_port: uart_stop_listening(uart_logger_port) return ("error", 0, f"PGKomm2 command failed: {status}") # Emit RX data (frames received) - only show Echo and Response, skip SB broadcasts if raw_data_callback and frames: for frame in frames: if len(frame) >= 5: adr1, adr2 = frame[2], frame[3] # Skip SB status broadcasts (background noise from device) if adr1 == 0x53 and adr2 == 0x42: # SB continue hex_rx = ' '.join(f'{b:02X}' for b in frame) # Add ASCII (skip DD 22 magic bytes) ascii_data = frame[2:] if len(frame) >= 2 else frame ascii_rx = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in ascii_data) if adr1 == 0x50 and adr2 == 0x48: # PH echo raw_data_callback("RX", f"{hex_rx} (Echo) | '{ascii_rx}'") elif adr1 == 0x48 and adr2 == 0x50: # HP response raw_data_callback("RX", f"{hex_rx} (Response) | '{ascii_rx}'") else: raw_data_callback("RX", f"{hex_rx} | '{ascii_rx}'") else: # Unknown frame format hex_rx = ' '.join(f'{b:02X}' for b in frame) ascii_data = frame[2:] if len(frame) >= 2 else frame ascii_rx = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in ascii_data) raw_data_callback("RX", f"{hex_rx} | '{ascii_rx}'") # ================================================================ # 3. Wait for logger packets (polling mode with stop condition) # ================================================================ uart_packets = [] if uart_logger_port and packet_config.enable: # Polling mode: wait for packets with grace period and timeout # Use defaults if None from database grace_ms = grace_timeout_ms if grace_timeout_ms is not None else 1500 stop_ms = stop_timeout_ms if stop_timeout_ms is not None else 150 grace_timeout_s = grace_ms / 1000.0 # Wait for first packet stop_timeout_s = stop_ms / 1000.0 # Silence between packets last_packet_count = 0 last_packet_time = 0.0 start_time = time.time() first_packet_received = False if raw_data_callback: raw_data_callback("INFO", f"Waiting for logger packets (grace: {grace_timeout_s*1000:.0f}ms, timeout: {stop_timeout_s*1000:.0f}ms)...") # Polling loop while True: time.sleep(0.05) # Poll every 50ms current_time = time.time() # Get current packet count current_packets = uart_get_detected_packets(uart_logger_port) current_count = len(current_packets) # Check if new packets arrived if current_count > last_packet_count: last_packet_count = current_count last_packet_time = current_time if not first_packet_received: first_packet_received = True if raw_data_callback: raw_data_callback("INFO", f"First logger packet received, monitoring for stop condition...") # Grace period check (only if no packets yet) if not first_packet_received: elapsed = current_time - start_time if elapsed >= grace_timeout_s: # Grace period expired, no packets uart_stop_listening(uart_logger_port) return ("error", 0, f"Logger not responding (grace timeout: {grace_timeout_s*1000:.0f}ms)") # Stop timeout check (only after first packet received) if first_packet_received: silence = current_time - last_packet_time if silence >= stop_timeout_s: # Stop condition met! if raw_data_callback: # Report buffer status if uart_logger_port._rx_buffer: fill = cb_fill_bytes(uart_logger_port._rx_buffer) cap = cb_capacity(uart_logger_port._rx_buffer) fill_mb = fill / (1024 * 1024) cap_mb = cap / (1024 * 1024) raw_data_callback("INFO", f"Buffer: {fill_mb:.2f}MB / {cap_mb:.1f}MB") # Report packet statistics packet_errors = uart_get_packet_errors(uart_logger_port) if packet_errors > 0: raw_data_callback("ERROR", f"⚠ Packet errors: {packet_errors} packets with end marker mismatch") raw_data_callback("INFO", f"✓ Valid packets: {current_count}") raw_data_callback("INFO", f"Stop condition: {stop_timeout_s*1000:.0f}ms silence detected") # Stop listening (but keep port open for next command) uart_stop_listening(uart_logger_port) uart_packets = current_packets break packet_count = len(uart_packets) elif uart_logger_port: # Logger enabled but packet detection disabled - just stop listening uart_stop_listening(uart_logger_port) packet_count = 0 else: # No logger port packet_count = 0 # ================================================================ # 4. Decode and save data # ================================================================ if raw_data_callback: i2c_count = len(self.i2c_readings) if i2c_count == 0 and i2c_port and packet_config.enable: # Expected I2C but got none - report raw_data_callback("ERROR", f"⚠ No I2C readings captured (expected ~{packet_count})") if self.i2c_failures > 0: raw_data_callback("ERROR", f"I2C failures: {self.i2c_failures}") raw_data_callback("INFO", f"Decoding and saving {packet_count} UART packets + {i2c_count} I2C readings...") # Create timestamp → I2C reading map for matching i2c_by_timestamp = {} for reading in self.i2c_readings: i2c_by_timestamp[reading['timestamp_ns']] = reading['i2c_bytes'] # Decode and save UART packets WITH correlated I2C data for pkt in uart_packets: # Look up matching I2C reading by timestamp i2c_bytes = i2c_by_timestamp.get(pkt.start_timestamp, None) self._save_combined_telemetry( session_id=session_id, session_name=session_name, run_no=run_no, run_command_id=command_id, packet_info=pkt, i2c_bytes=i2c_bytes, run_start_ns=run_start_ns ) # Commit database changes self.db_conn.commit() if raw_data_callback: raw_data_callback("INFO", f"✓ Database saved: {packet_count} UART packets, {len(self.i2c_readings)} I2C readings") # Report errors if any via callback if self.i2c_failures > 0 and raw_data_callback: raw_data_callback("ERROR", f"I2C read failures: {self.i2c_failures}") return ("success", packet_count, "") except Exception as e: # Stop listening if still active (logger port) try: if uart_logger_port: uart_stop_listening(uart_logger_port) except: pass return ("error", 0, f"Exception during run: {str(e)}") def execute_i2c_command( self, session_id: str, session_name: str, run_no: int, command_id: int, command_name: str, operation: str, device_address: int, register: int, hex_string: str, i2c_port: Optional[I2CHandle], raw_data_callback = None ) -> Tuple[str, int, str]: """ Execute a single I2C command. Args: session_id: Session ID session_name: Session name run_no: Run number command_id: I2C command ID from database command_name: Command name (e.g., "zero", "read angle") operation: Operation type (e.g., "read", "write", "zero") device_address: I2C device address (e.g., 0x40) register: Register address (e.g., 0xFE) hex_string: Number of bytes to read/write i2c_port: I2C handle raw_data_callback: Callback for status updates Returns: (status, data_count, error_msg) """ try: if not i2c_port: return ("error", 0, "I2C port not available") run_start_ns = global_now_ns() # Parse hex_string as byte count for reads try: byte_count = int(hex_string, 16) if isinstance(hex_string, str) else int(hex_string) except: byte_count = 2 # Default to 2 bytes # Special handling for "zero" operation if operation.lower() == "zero" or "zero" in command_name.lower(): return self._execute_i2c_zero( session_id=session_id, session_name=session_name, run_no=run_no, command_id=command_id, device_address=device_address, register=register, i2c_port=i2c_port, run_start_ns=run_start_ns, raw_data_callback=raw_data_callback ) # Regular I2C read elif operation.lower() == "read": if raw_data_callback: raw_data_callback("INFO", f"Read: addr=0x{device_address:02X} reg=0x{register:02X} len={byte_count}") status, data = i2c_read_block(i2c_port, device_address, register, byte_count) if status == I2CStatus.OK: hex_data = ' '.join(f'{b:02X}' for b in data) if raw_data_callback: raw_data_callback("RX", f"{hex_data}") return ("success", len(data), "") else: return ("error", 0, f"I2C read failed: {status}") else: return ("error", 0, f"Unsupported I2C operation: {operation}") except Exception as e: return ("error", 0, f"Exception during I2C command: {str(e)}") def _execute_i2c_zero( self, session_id: str, session_name: str, run_no: int, command_id: int, device_address: int, register: int, i2c_port: I2CHandle, run_start_ns: int, raw_data_callback = None ) -> Tuple[str, int, str]: """ Execute I2C zeroing: take 50 samples, calculate median as zero reference. Returns: (status, sample_count, error_msg) """ import statistics if raw_data_callback: raw_data_callback("INFO", "Zeroing: Collecting 50 samples...") samples = [] duration_ms = 500 # 500ms total interval_ms = 10 # 10ms between samples (50 samples) deadline = time.time() + (duration_ms / 1000.0) while time.time() < deadline and len(samples) < 50: status, data = i2c_read_block(i2c_port, device_address, register, 2) if status == I2CStatus.OK and len(data) == 2: # Convert to 14-bit raw value (big-endian) raw16 = ((data[0] & 0xFF) << 8) | (data[1] & 0xFF) raw14 = raw16 & 0x3FFF samples.append(raw14) time.sleep(interval_ms / 1000.0) if not samples: if raw_data_callback: raw_data_callback("ERROR", "I2C Zeroing failed: No valid samples") return ("error", 0, "No valid samples collected") # Calculate median (better than average for outlier rejection) median_raw14 = int(statistics.median(samples)) self.i2c_zero_reference = median_raw14 # Convert to angle (360° / 16384 counts) zero_angle_deg = (median_raw14 * 360.0) / 16384.0 if raw_data_callback: raw_data_callback("INFO", f"✓ Zero set: raw14={median_raw14} ({zero_angle_deg:.2f}°) from {len(samples)} samples") # Zero reference stored in memory only (self.i2c_zero_reference) # Will be saved to telemetry tables when UART commands execute return ("success", len(samples), "") def _parse_hex_string(self, hex_str: str) -> Optional[bytes]: """ Parse hex string to bytes. Args: hex_str: Hex string (e.g., "DD 22 50 48" or "DD225048") Returns: Bytes or None if invalid """ try: # Remove spaces and convert hex_clean = hex_str.replace(' ', '') return bytes.fromhex(hex_clean) except: return None def _save_combined_telemetry( self, session_id: str, session_name: str, run_no: int, run_command_id: int, packet_info: PacketInfo, i2c_bytes: Optional[bytes], run_start_ns: int ): """ Save combined UART + I2C telemetry to database (single row). Saves to both telemetry_raw and telemetry_decoded tables. UART and I2C data are correlated by timestamp and saved together. """ # Decode packets decoded_uart = decode_uart_packet(packet_info.data) decoded_i2c = decode_i2c_sample(i2c_bytes, self.i2c_zero_reference) if i2c_bytes else None # Calculate relative time from run start (both in nanoseconds, convert to milliseconds) time_ms = (packet_info.start_timestamp - run_start_ns) / 1_000_000.0 # Calculate angular velocity and acceleration using Kalman filter (if I2C data available) angular_velocity = None angular_acceleration = None if decoded_i2c and decoded_i2c.get('i2c_angle_deg') is not None: raw_angle = decoded_i2c.get('i2c_angle_deg') current_time_ns = packet_info.start_timestamp # In nanoseconds # Use Kalman filter for smoothed velocity/acceleration estimates if self.kalman_filter is not None: # Process measurement through Kalman filter filtered_angle, angular_velocity, angular_acceleration = self.kalman_filter.process( angle_measurement=raw_angle, timestamp_ns=current_time_ns ) # For duplicate timestamps (dt=0), filter returns previous estimates # First packet returns velocity=0, accel=0 (initialized state) # Save to telemetry_raw (backup) - BOTH uart_raw_packet AND i2c_raw_bytes in ONE row cursor = self.db_conn.cursor() cursor.execute(""" INSERT INTO telemetry_raw ( session_id, session_name, run_no, run_command_id, t_ns, time_ms, uart_raw_packet, i2c_raw_bytes, i2c_zero_ref ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( session_id, session_name, run_no, run_command_id, packet_info.start_timestamp, # Already in nanoseconds time_ms, packet_info.data, i2c_bytes, # Can be None if no I2C self.i2c_zero_reference # Zero reference (0 if not zeroed) )) # Save to telemetry_decoded (main data - all decoded fields) cursor.execute(""" INSERT INTO telemetry_decoded ( session_id, session_name, run_no, run_command_id, t_ns, time_ms, motor_current, encoder_value, relative_encoder_value, v24_pec_diff, pwm, i2c_raw14, i2c_zero_raw14, i2c_delta_raw14, i2c_angle_deg, i2c_zero_angle_deg, angular_velocity, angular_acceleration, i2c_zero_ref ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( session_id, session_name, run_no, run_command_id, packet_info.start_timestamp, # Already in nanoseconds time_ms, # UART decoded fields decoded_uart.get('motor_current'), decoded_uart.get('encoder_value'), decoded_uart.get('relative_encoder_value'), decoded_uart.get('v24_pec_diff'), decoded_uart.get('pwm'), # I2C decoded fields decoded_i2c.get('i2c_raw14') if decoded_i2c else None, decoded_i2c.get('i2c_zero_raw14') if decoded_i2c else None, decoded_i2c.get('i2c_delta_raw14') if decoded_i2c else None, decoded_i2c.get('i2c_angle_deg') if decoded_i2c else None, decoded_i2c.get('i2c_zero_angle_deg') if decoded_i2c else None, # Derived fields angular_velocity, angular_acceleration, self.i2c_zero_reference # Zero reference (0 if not zeroed) )) # ============================================================================= # Convenience function for external use # ============================================================================= def execute_run( db_connection: sqlite3.Connection, session_id: str, session_name: str, run_no: int, command_id: int, command_hex: str, uart_command_port: UARTPort, uart_logger_port: Optional[UARTPort], i2c_port: Optional[I2CHandle], packet_config: PacketConfig, i2c_address: int = 0x40, i2c_register: int = 0xFE, stop_timeout_ms: int = 5000, grace_timeout_ms: int = 1500, i2c_zero_ref: int = 0, raw_data_callback = None ) -> Tuple[str, int, str]: """ Execute a single RUN (convenience function). Args: db_connection: Database connection session_id: Session ID session_name: Session name run_no: Run number command_id: UART command ID command_hex: Command hex string uart_command_port: UART command port (TX/RX for commands) uart_logger_port: UART logger port (RX for telemetry, optional) i2c_port: I2C port (optional) packet_config: Packet detection configuration i2c_address: I2C device address i2c_register: I2C register address stop_timeout_ms: Stop condition timeout grace_timeout_ms: Grace period before first packet i2c_zero_ref: I2C zero reference (0 = not zeroed) raw_data_callback: Callback for raw data display (direction, hex_string) Returns: (status, packet_count, error_msg) """ executor = RunExecutor(db_connection) executor.i2c_zero_reference = i2c_zero_ref # Set zero reference from session return executor.execute_run( session_id=session_id, session_name=session_name, run_no=run_no, command_id=command_id, command_hex=command_hex, uart_command_port=uart_command_port, uart_logger_port=uart_logger_port, i2c_port=i2c_port, packet_config=packet_config, i2c_address=i2c_address, i2c_register=i2c_register, stop_timeout_ms=stop_timeout_ms, grace_timeout_ms=grace_timeout_ms, raw_data_callback=raw_data_callback ) def execute_i2c_command( db_connection: sqlite3.Connection, session_id: str, session_name: str, run_no: int, command_id: int, command_name: str, operation: str, device_address: int, register: int, hex_string: str, i2c_port: Optional[I2CHandle], i2c_zero_ref: int = 0, raw_data_callback = None ) -> Tuple[str, int, str, int]: """ Execute a single I2C command (convenience function). Args: db_connection: Database connection session_id: Session ID session_name: Session name run_no: Run number command_id: I2C command ID command_name: Command name operation: Operation type (read/write/zero) device_address: I2C device address register: Register address hex_string: Byte count or data i2c_port: I2C handle i2c_zero_ref: Current I2C zero reference (0 = not zeroed) raw_data_callback: Callback for status updates Returns: (status, data_count, error_msg, updated_i2c_zero_ref) """ executor = RunExecutor(db_connection) executor.i2c_zero_reference = i2c_zero_ref # Set zero reference from session status, data_count, error_msg = executor.execute_i2c_command( session_id=session_id, session_name=session_name, run_no=run_no, command_id=command_id, command_name=command_name, operation=operation, device_address=device_address, register=register, hex_string=hex_string, i2c_port=i2c_port, raw_data_callback=raw_data_callback ) # Return updated zero reference (may have changed if zero command) return (status, data_count, error_msg, executor.i2c_zero_reference) if __name__ == "__main__": print("Run Module") print("=" * 60) print("This module executes a single RUN.") print("It should be called by session.py, not run directly.") print() print("Features:") print("✓ UART packet detection with callback") print("✓ Real-time I2C triggering") print("✓ Decoder integration") print("✓ Database storage (telemetry_raw + telemetry_decoded)") print("✓ Error handling") print() print("Ready to be used by session.py!")