21 KiB
CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
Project Overview
vzug-e-hinge Test & Control System - A PyQt6-based desktop application for testing and controlling e-hinge devices through UART and I2C interfaces. The system provides automated test session execution, real-time data logging, packet detection, and telemetry storage in SQLite.
Running the Application
# Install dependencies
pip install -r requirements.txt
# Run the application (uses default database path: ./database/ehinge.db)
python main.py
# Run with custom database path
python main.py /path/to/custom.db
Database Management
# Initialize/recreate database
python database/init_database.py
# Recreate database (overwrite existing)
python database/init_database.py --overwrite
# Check database health
python database/init_database.py --check
Architecture
Application Entry Point
main.py- Main application window with tabbed interface (Session, Configure Session, Configure Interface, UART, I2C, Graph)MainWindowclass manages all tabs and coordinates database connection
Core Session Execution Flow
The application uses a three-layer execution model:
-
Session Layer (
session.py) - Orchestrates the complete test session- Loads interface and session profiles from database
- Opens/closes hardware ports (UART command, UART logger, I2C)
- Manages command sequence execution
- Handles pause/stop requests (queued after current run)
- Emits PyQt signals for real-time GUI updates
-
Run Layer (
run.py) - Executes a single RUN (one command with data collection)- Configures packet detection with callback
- Sends UART command via command port
- Collects telemetry packets from logger port (if enabled)
- Triggers I2C reads correlated to UART timestamps
- Decodes data and saves to database
-
Hardware Layer (
uart/uart_kit/uart_core.py,i2c/i2c_kit/i2c_core.py) - Low-level interface management- UART: Packet detection, circular buffer, reader threads, timestamping
- I2C: Bus operations, device scanning, continuous logging
Critical Architectural Concepts
Dual UART Port System
The application uses two separate UART ports for different purposes:
- UART Command Port (TX/RX): Sends commands to device and receives ACK/responses
- UART Logger Port (RX only, optional): Receives telemetry packets from device
- This separation allows simultaneous command sending and telemetry logging
- Logger port can be disabled for simple command/response mode
Packet Detection vs. Raw Mode
- Packet Detection Enabled: UART Logger port detects packets with start/end markers, used for telemetry streams
- Packet Detection Disabled: Raw TX/RX mode for simple command/response (set
print_command_rx=Truein session profile)
Queued Pause/Stop
- Pause and Stop requests are queued and execute AFTER the current run completes
- Users can press pause/stop anytime, but it only takes effect during the delay phase between runs
- This prevents data corruption by ensuring runs complete atomically
PGKomm2 Protocol (V-ZUG Serial Communication)
The e-hinge devices use PGKomm2 protocol (V-ZUG spec A5.5093D-AB) for UART communication:
Frame Format:
DD 22 | ADR1 ADR2 | LEN | DATA (0-255 bytes) | BCC
MAGIC | Address | Len | Payload | Checksum
Key Characteristics:
- MAGIC:
0xDD(221), INVMAGIC:0x22(34) - Length-delimited: LEN field specifies DATA length (no terminator bytes like
\n) - Address swap: Slave response swaps ADR1/ADR2 (if master sends
PH, slave respondsHP) - BCC: Block check character calculated from ADR1 through DATA (XOR checksum) - VALIDATED on receive
- Timing: Response time < 15 ms, inter-byte time < 10 ms
- Baud rates: 4800 or 115200 baud, 8N1 with Even Parity
Common Addresses:
PH(0x50 0x48): Command from master → Echo from slaveHP(0x48 0x50): Response from slaveSB(0x53 0x42): Status broadcast
Multiple Frames: Device typically sends multiple frames per command:
- Echo (PH): Device echoes received command
- Response (HP): Actual response with data
- Status (SB): Optional status update
Example:
TX: DD 22 50 48 02 43 4F 16 (Master sends PH command)
RX: DD 22 50 48 02 43 4F 16 (Echo: PH)
RX: DD 22 48 50 02 43 4F 16 (Response: HP, addresses swapped)
RX: DD 22 53 42 01 4E 5E (Status: SB)
Documentation:
- Protocol spec:
v_zug_documentation/PGKomm2_Spec_A5.5093D_AB.pdf - Application telegrams:
v_zug_documentation/PgKomm2_ Application Telegrams-v122-20230417_181422.pdf
Quick Start (Final Working Implementation):
- GUI defaults automatically set: Parity=Even, Mode=PGKomm2, Timeout=30ms
- Connect → waits 100ms settle delay
- First 1-2 commands may timeout (device synchronization)
- After 3-4 commands → rock solid reliability
- BCC errors logged to console if frame corruption detected
Implementation Notes (uart/uart_kit/uart_core.py::uart_send_and_read_pgkomm2()):
The implementation uses a simple single-loop approach (matches old working code exactly):
# Flush buffer and send
reset_input_buffer()
uart_write(tx_data)
# Single loop with deadline
deadline = now + 30ms # Total timeout (spec <15ms + real-world margin)
while now < deadline:
if bytes_available:
rx_buffer += read_all_available() # Read IMMEDIATELY
# Extract frames using LEN field
while True:
frame = extract_frame(rx_buffer) # Uses LEN to know size
validate_BCC(frame) # Reject corrupted
if frame is None:
break # Need more bytes
collected_frames.append(frame)
if frame.address == HP:
return collected_frames # Stop on HP!
# No sleep - just spin
Key Implementation Details:
- Single timeout (30ms): Total window for entire operation (spec says < 15ms, +15ms margin for real-world conditions)
- Read immediately: When bytes arrive, read them instantly (no delays)
- No sleep: Pure spinning for minimum latency
- Stop on HP: Exits immediately when HP response detected
- BCC validation: Every frame validated before acceptance - corrupted frames rejected and logged
- LEN field: Uses frame length to know exactly how many bytes to read
- Simple: No phases, no grace periods, just read-parse-detect loop
- Debug logging: TIMEOUT and IO_ERROR cases logged with buffer contents for diagnostics
Default GUI Settings (for PGKomm2):
- Parity: Even (required by spec)
- Mode: PGKomm2
- Timeout: 30ms (hidden, automatic)
Why 30ms (not 15ms)?
- Spec says < 15ms: Ideal conditions only
- Real world: Background SB telemetry, device load, USB latency, reader thread stop/restart add overhead
- Testing results: 20ms still showed occasional timeouts, 30ms more reliable
- Still fast: Stops immediately on HP detection (typically 8-12ms actual response time)
Error Handling:
- TIMEOUT: No data received within 30ms (device not responding) or got frames but no HP response
- IO_ERROR: Unparseable data in buffer (incomplete frame or BCC failure)
- BCC ERROR: Logged to console when checksum validation fails (electrical noise/corruption)
- Format:
[PGKOMM2] ✗ BCC FAIL: ADR=XX YY, calc=ZZ, recv=WW - Shows calculated vs received BCC and full frame hex dump
- Format:
Connection Behavior:
- 100ms settle delay: After port opens, waits 100ms for device to stabilize
- Reason: Device may reset on DTR/RTS change, needs boot/initialization time
- First commands: May still timeout if device is sending SB status broadcasts
- After 3-4 commands: Device synchronized, all commands work reliably
Threading Architecture:
- UART Reader Thread: Always running (except during PGKomm2 operations)
- Reads serial port → circular buffer continuously
- Most modes read from circular buffer
- PGKomm2: No worker thread - called directly from GUI thread
- Blocks GUI for ~30ms (acceptable, keeps threading simple)
- Temporarily stops reader thread for exclusive serial access (prevents race condition)
- Reads directly from serial port (not circular buffer - lower latency)
- Restarts reader thread after completion
- Why stop reader thread?: PGKomm2 needs direct serial reads for low latency. Stopping reader prevents both threads competing for same bytes.
Key Design Decisions (Lessons Learned):
-
Simplicity Over Complexity
- Tried: Worker threads, two-phase reading, packet-driven with circular buffer
- Final: Direct serial reads with reader thread temporarily stopped
- Lesson: Simple approach that matches old working code is best
-
Timeout Tuning
- Spec: < 15ms response time
- Reality: 30ms needed (background telemetry, thread overhead, USB latency)
- Testing: 15ms → occasional timeouts, 20ms → still issues, 30ms → reliable
-
Thread Safety
- Problem: Reader thread and PGKomm2 competing for serial bytes (race condition)
- Solution: Stop reader during PGKomm2 operations (exclusive access)
- Benefit: Simple, predictable, no race conditions
-
BCC Validation Critical
- Initially: Not validated (trusted device)
- Reality: Occasional electrical noise/corruption
- Solution: Validate BCC, log errors, reject corrupted frames
- Result: More robust, easier debugging
-
First Commands After Connect
- Device needs time to stabilize after DTR/RTS change
- 100ms settle delay helps but not sufficient
- Accept first 1-2 timeouts as normal device synchronization
- After 3-4 commands: rock solid
PGKomm2 Implementation Status: ✅ Production Ready
Database Schema
Located in database/init_database.py:
Profiles:
interface_profiles- Hardware configuration (UART command/logger ports, I2C bus, packet detection settings)session_profiles- Test sequences (JSON command list with delays and repetitions)gui_profiles- GUI appearance settings
Execution:
sessions- Session records (links to profiles, tracks total runs, status)uart_commands- Predefined UART commands (hex strings, categories)i2c_commands- Predefined I2C operations (register access)
Telemetry:
telemetry_raw- Raw backup data (UART packets and I2C bytes as BLOB)telemetry_decoded- Processed data (motor current, encoder values, angles, etc.)
Both telemetry tables use nanosecond timestamps (t_ns) for precise correlation between UART and I2C data.
Widget Organization
Configuration Widgets:
configure_interface_widget.py- Create/edit interface profiles (UART/I2C settings)configure_session_widget.py- Create/edit session profiles (command sequences)
Control Widgets:
session_widget.py- Main control panel for running sessions (start/pause/stop, profile selection, status display)uart/uart_integrated_widget.py- Manual UART testing (command table + direct control)i2c/i2c_integrated_widget.py- Manual I2C testing (command table + direct control)
UART Widget Modes (uart/uart_kit/uart_core_widget.py):
The UART widget supports 4 operational modes:
-
Request-Response: Send command, wait for response packet (timeout or terminator-based)
- Uses packet detection with configurable stop condition
- Best for generic packet-based protocols
-
Polling: Continuous packet detection with grace period
- Waits for first byte (grace period), then captures burst
- Supports packet detection with callbacks
- Best for streaming telemetry
-
Listening: Raw stream mode with no stop condition
- Captures everything continuously
- Manual buffer reading required
- Best for debugging and raw data capture
-
PGKomm2: V-ZUG protocol with DD 22 framing and automatic HP detection
- Automatic response detection - No timing configuration needed
- Stops immediately when HP response detected (typically 5-10ms)
- 20ms safety timeout (hardcoded, hidden from user)
- Automatically parses multiple frames (PH echo + HP response)
- Identifies frame types by address bytes (PH/HP/SB)
- Requires EVEN parity (115200 8E1 per spec)
- Use this mode for e-hinge device communication
Session Worker:
session_worker.py- QThread wrapper for running sessions in background without blocking GUI
Supporting Modules
decoder.py- Packet decoding logic (UART telemetry and I2C angle conversion)global_clock.py- Shared timestamp source for UART/I2C synchronizationgraph_table_query.py- Database query utilities for graphingbuffer_kit/circular_buffer.py- Thread-safe circular buffer for data streamscommand_table/command_table.py- Reusable command table widget
Development Workflow
Adding New UART Commands
- Insert into
uart_commandstable via database or GUI - Commands are immediately available in session profiles and manual testing
- Format:
hex_stringas space-separated hex bytes (e.g., "DD 22 50 48 02 41 52 09")
Creating Test Profiles
- Create interface profile with port settings and packet detection config
- Create session profile with JSON command sequence:
{
"commands": [
{
"command_id": 1,
"delay_ms": 3000
},
{
"command_id": 5,
"delay_ms": 5000
}
]
}
- Use session widget to execute profile
Database Size Management
- Max size: 2 GB (configured in
init_database.py) - Monitor via status bar (shows percentage and color-coded warning)
- Use "Database > Vacuum Database" to reclaim space after deletions
Port Configuration Notes
UART Ports:
- Linux:
/dev/ttyUSB0,/dev/ttyACM0, etc. - Must have read/write permissions (add user to
dialoutgroup) - Typical baud rates: 9600, 115200
I2C Bus:
- Linux: Bus ID corresponds to
/dev/i2c-X(e.g., bus_id=1 means/dev/i2c-1) - Requires
smbus2library - Must have read/write permissions (add user to
i2cgroup)
Signal/Slot Architecture
The Session class emits PyQt signals for GUI updates:
session_started(session_id)- Session beginscommand_started(command_no, command_name)- Command startsrun_completed(run_no, packet_count)- Run finishesdelay_countdown(seconds_remaining)- Countdown during delayssession_paused()- Session pausedsession_finished()- Session completeserror_occurred(error_message)- Error encounteredstatus_changed(status_text)- General status updateraw_data_received(direction, hex_string)- TX/RX data for display
Connect to these signals in widgets for real-time updates without polling.
Testing
Individual modules can be tested directly:
# Test UART core
python uart/uart_kit/uart_core.py
# Test I2C core
python i2c/i2c_kit/i2c_core.py
# Test circular buffer
python buffer_kit/circular_buffer_test.py
Code Style Notes
- Type hints used throughout (Python 3.10+)
- Dataclasses for configuration structures
- Enums for status codes and modes
- Thread-safe operations with locks where needed
- Extensive docstrings with Args/Returns/Raises sections
Technical Challenges & Solutions
Challenge 1: GUI Threading & Responsiveness
Problem: Qt requires GUI to never block. Session execution is a long-running loop that would freeze the interface.
Solution: session_worker.py - QThread wrapper that:
- Runs session in background thread
- Creates thread-local SQLite connection (SQLite objects cannot cross threads)
- Uses Qt signals for thread-safe GUI updates
- Swaps database connections before/after execution
Critical Code:
# session_worker.py
thread_db_conn = sqlite3.connect(self.db_path) # Thread-local
self.session.db_conn = thread_db_conn # Temporary swap
self.session.start_session() # Blocking call runs in background
Challenge 2: Timestamp Correlation (UART ↔ I2C)
Problem: UART packets arrive at unpredictable times. I2C angle readings must be timestamped at the exact moment the UART packet is detected to correlate motor state with door position.
Solution: Callback-based triggering in run.py:
def on_uart_packet_detected(timestamp_ns: int):
"""Called IMMEDIATELY when packet detected"""
if i2c_port:
status, i2c_bytes = i2c_read_block(i2c_port, ...)
store_with_timestamp(timestamp_ns, i2c_bytes) # Same timestamp!
Flow:
- UART reader thread detects packet start → captures
timestamp_ns - Calls callback immediately with timestamp
- Callback reads I2C angle sensor
- Both readings share identical timestamp → perfect correlation in database
Hardware Notes:
- Two physically separate USB UART interfaces (command + logger)
- I2C read must complete before next UART packet arrives
- Blocking I2C read is acceptable if fast enough (<17 byte receive time @ baud rate)
Challenge 3: Circular Buffer Absolute Positioning
Problem: Standard circular buffers wrap around. Need to extract data spans that might cross the wrap boundary using absolute offsets.
Solution: buffer_kit/circular_buffer.py tracks write_absolute (total bytes ever written):
write_absolute: int # Monotonic counter, never wraps
position_in_buffer = write_absolute % capacity # Actual index
# Extract span handles wrapping automatically
cb_copy_span(buffer, start_abs=1000, end_abs=1050) # Works even if wrapped
Use Case: Read UART/I2C ring buffers after run completes, even if data wrapped during acquisition.
Challenge 4: Grace Period vs Stop Timeout
Problem: Device may take time to respond. Need to distinguish:
- Grace period: Wait for FIRST byte after command sent
- Stop timeout: Silence between packets within a burst
Solution in UART config:
polling_mode=True # Enable grace period
grace_timeout_ms=150 # Wait up to 150ms for first byte
stop_timeout_ms=150 # Silence between packets
Behavior:
- Send command
- Wait up to
grace_timeout_msfor first byte → If no byte: "Device not responding" - Once first byte arrives, switch to stop condition mode
- If
stop_timeout_mspasses with no data → Packet burst complete
Configured per interface profile in interface_profiles table.
Challenge 5: Buffer Sizing
Configuration: 40MB buffers for both UART and I2C
Rationale:
- Buffers cleared per run, not shared across runs
- Large buffers prevent overflow during long runs
- System has 8GB RAM + 32GB NVMe → 40MB per port is negligible
- Prevents data loss if device sends unexpected burst
Locations:
- Default:
uart_core.pyline 139,i2c_core.pyline 72 - Session overrides:
session.py_open_ports() method
Challenge 6: Error Reporting
Requirement: Track I2C read failures without failing the entire run.
Solution in run.py:
self.i2c_failures = 0 # Counter initialized
# In callback:
if i2c_read_status != OK:
self.i2c_failures += 1 # Count but continue
# At end of run:
if self.i2c_failures > 0:
raw_data_callback("ERROR", f"I2C failures: {self.i2c_failures}")
Display: Errors appear in GUI Data Monitor, not as run failure.
Challenge 7: Queued Pause/Stop
Problem: User presses Stop during run execution. Cannot stop immediately (would corrupt data, leave hardware in bad state).
Solution: Flag-based queuing:
self.stop_queued = False # Flag set by user action
# In command loop:
for cmd in commands:
if self.stop_queued: # Check BEFORE starting run
finalize_and_return()
execute_run(...) # Runs atomically
if self.stop_queued: # Check during delay
finalize_and_return()
Behavior: Pause/Stop take effect between runs, not during. Current run always completes.
Configuration Details
Stop Timeout Loading
Fixed: Stop timeout and grace period are now loaded from interface_profiles table and passed to execute_run().
Previous Issue: Hardcoded 5000ms timeout → Now uses configured value (typically 150ms)
Files Modified:
session.pyline 190: Addeduart_logger_grace_msto SQL querysession.pyline 222: Added tointerface_configdictsession.pyline 742: Changed from hardcoded 5000 tointerface_config['uart_logger_timeout_ms']session.pylines 341-349, 378-388: Full UART config with all parameterssession.pyline 476: I2C config with 40MB buffer
Typical Configuration Values
uart_logger_timeout_ms = 150 # Packet silence timeout
uart_logger_grace_ms = 150 # Wait for first byte
uart_command_timeout_ms = 1000 # Command response timeout
buffer_size = 40 * 1024 * 1024 # 40MB per port
Diagrams
Block diagrams available in diagram/ folder:
Block Diagram.xml- Overall system architecture (3 layers)run.xml- Single RUN execution flowsession.xml- Session configuration and command loop
View with draw.io or compatible tool.