You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1079 lines
33 KiB
1079 lines
33 KiB
#!/usr/bin/env python3
|
|
"""
|
|
UART Core - vzug-e-hinge
|
|
=========================
|
|
Clean UART port management with packet detection and timestamping.
|
|
|
|
Features:
|
|
- Single port per instance (no multi-port management)
|
|
- Reader thread writes to circular buffer
|
|
- Start/stop condition detection with timestamps
|
|
- Packet counting per connection
|
|
- Configurable stop conditions (timeout or terminator byte)
|
|
- Polling mode with grace period
|
|
- **NEW: Packet detection in listening mode with real-time timestamps**
|
|
- Buffer overflow tracking
|
|
- Clean connect/disconnect (no auto-reconnect)
|
|
|
|
Author: Kynsight
|
|
Version: 2.1.0 - Added packet detection for listening mode
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from typing import Optional, Callable, Tuple
|
|
|
|
try:
|
|
import serial
|
|
except ImportError:
|
|
serial = None
|
|
|
|
from buffer_kit.circular_buffer import (
|
|
Status as BufferStatus,
|
|
CircularBufferHandle,
|
|
cb_init,
|
|
cb_write,
|
|
cb_copy_span,
|
|
cb_w_abs,
|
|
cb_capacity,
|
|
cb_fill_bytes,
|
|
cb_fill_pct,
|
|
cb_overflows,
|
|
)
|
|
|
|
|
|
__all__ = [
|
|
# Status & Configuration
|
|
'Status',
|
|
'StopConditionMode',
|
|
'UARTConfig',
|
|
'PacketConfig',
|
|
'PacketInfo',
|
|
'PortStatus',
|
|
'UARTPort',
|
|
|
|
# Lifecycle
|
|
'uart_create',
|
|
'uart_open',
|
|
'uart_close',
|
|
|
|
# Reader thread
|
|
'uart_start_reader',
|
|
'uart_stop_reader',
|
|
|
|
# Write
|
|
'uart_write',
|
|
|
|
# Packet detection modes
|
|
'uart_send_and_receive',
|
|
'uart_poll_packet',
|
|
|
|
# Listening mode
|
|
'uart_start_listening',
|
|
'uart_stop_listening',
|
|
'uart_read_buffer',
|
|
|
|
# NEW: Listening with packet detection
|
|
'uart_start_listening_with_packets',
|
|
'uart_get_detected_packets',
|
|
'uart_clear_detected_packets',
|
|
|
|
# Status
|
|
'uart_get_status',
|
|
]
|
|
|
|
|
|
# =============================================================================
|
|
# Status & Configuration
|
|
# =============================================================================
|
|
|
|
class Status(Enum):
|
|
"""Operation status codes."""
|
|
OK = 0
|
|
TIMEOUT = 1
|
|
TIMEOUT_NO_DATA = 2
|
|
BUFFER_OVERFLOW = 3
|
|
IO_ERROR = 4
|
|
BAD_CONFIG = 5
|
|
PORT_CLOSED = 6
|
|
ALREADY_OPEN = 7
|
|
READER_NOT_RUNNING = 8
|
|
|
|
|
|
class StopConditionMode(Enum):
|
|
"""How to detect packet end."""
|
|
TIMEOUT = 0 # No data for N ms
|
|
TERMINATOR = 1 # Specific byte received
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class UARTConfig:
|
|
"""
|
|
UART port configuration.
|
|
|
|
device: Serial device path (e.g., "/dev/ttyUSB0")
|
|
baudrate: Baud rate (e.g., 115200)
|
|
data_bits: Data bits (7 or 8)
|
|
stop_bits: Stop bits (1 or 2)
|
|
parity: Parity ('N', 'E', 'O', 'M', 'S')
|
|
buffer_size: RX circular buffer capacity in bytes
|
|
read_chunk_size: Max bytes to read per loop iteration
|
|
|
|
stop_mode: TIMEOUT or TERMINATOR
|
|
stop_timeout_ms: Timeout in ms (for TIMEOUT mode or grace period)
|
|
stop_terminator: Byte to detect (for TERMINATOR mode)
|
|
|
|
polling_mode: If True, enables grace period before timeout
|
|
grace_timeout_ms: Max wait for first byte in polling mode
|
|
|
|
timestamp_source: Optional external time function (default: time.perf_counter)
|
|
"""
|
|
device: str
|
|
baudrate: int
|
|
data_bits: int = 8
|
|
stop_bits: int = 1
|
|
parity: str = 'N'
|
|
buffer_size: int = 496
|
|
read_chunk_size: int = 512
|
|
|
|
stop_mode: StopConditionMode = StopConditionMode.TIMEOUT
|
|
stop_timeout_ms: int = 150
|
|
stop_terminator: int = 0x0A # Newline
|
|
|
|
polling_mode: bool = False
|
|
grace_timeout_ms: int = 150
|
|
|
|
timestamp_source: Optional[Callable[[], float]] = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PacketConfig:
|
|
"""
|
|
Packet detection configuration for listening mode.
|
|
|
|
Used to detect packet boundaries in continuous data stream.
|
|
|
|
Example for format: EF FE [14 bytes] EE
|
|
PacketConfig(
|
|
enable=True,
|
|
start_marker=b'\\xEF\\xFE',
|
|
packet_length=17,
|
|
end_marker=b'\\xEE',
|
|
on_packet_callback=my_callback_function
|
|
)
|
|
|
|
Fields:
|
|
enable: Enable/disable packet detection
|
|
start_marker: Bytes that mark packet start (e.g., b'\\xEF\\xFE')
|
|
packet_length: Total packet length in bytes (including markers)
|
|
end_marker: Bytes that mark packet end (e.g., b'\\xEE')
|
|
on_packet_callback: Optional callback function called when packet detected
|
|
Signature: callback(timestamp_ns: int) -> None
|
|
Called in reader thread with packet timestamp
|
|
"""
|
|
enable: bool = False
|
|
start_marker: Optional[bytes] = None
|
|
packet_length: Optional[int] = None
|
|
end_marker: Optional[bytes] = None
|
|
on_packet_callback: Optional[Callable[[int], None]] = None
|
|
|
|
|
|
@dataclass
|
|
class PacketInfo:
|
|
"""
|
|
Information about a detected packet.
|
|
|
|
packet_id: Sequential ID (resets per connection)
|
|
start_timestamp: When first byte arrived (nanoseconds)
|
|
stop_timestamp: When stop condition met (not used in listening mode)
|
|
data: Packet payload
|
|
stop_reason: "timeout", "terminator", "grace_timeout", "packet_complete"
|
|
"""
|
|
packet_id: int
|
|
start_timestamp: float
|
|
stop_timestamp: float
|
|
data: bytes
|
|
stop_reason: str
|
|
|
|
|
|
@dataclass
|
|
class PortStatus:
|
|
"""
|
|
Current port status.
|
|
|
|
is_open: Port is opened
|
|
is_reader_running: Reader thread is active
|
|
buffer_capacity: Buffer size in bytes
|
|
buffer_fill_bytes: Current buffer usage
|
|
buffer_fill_percent: Fill percentage (0-100)
|
|
buffer_overflows: Number of buffer overflows
|
|
total_bytes_received: Total bytes since connection
|
|
total_packets: Total packets detected
|
|
"""
|
|
is_open: bool
|
|
is_reader_running: bool
|
|
buffer_capacity: int
|
|
buffer_fill_bytes: int
|
|
buffer_fill_percent: int
|
|
buffer_overflows: int
|
|
total_bytes_received: int
|
|
total_packets: int
|
|
|
|
|
|
# =============================================================================
|
|
# UART Port Handle
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class UARTPort:
|
|
"""
|
|
UART port instance.
|
|
|
|
Internal state - do not access directly.
|
|
Use the public API functions.
|
|
"""
|
|
config: UARTConfig
|
|
|
|
# Serial port
|
|
_serial_port: Optional[serial.Serial] = None
|
|
_is_open: bool = False
|
|
|
|
# Circular buffer
|
|
_rx_buffer: Optional[CircularBufferHandle] = None
|
|
|
|
# Reader thread
|
|
_reader_thread: Optional[threading.Thread] = None
|
|
_reader_running: bool = False
|
|
_stop_reader_event: threading.Event = field(default_factory=threading.Event)
|
|
|
|
# Statistics (protected by lock)
|
|
_lock: threading.Lock = field(default_factory=threading.Lock)
|
|
_total_bytes_received: int = 0
|
|
_total_packets: int = 0
|
|
_last_rx_timestamp: float = 0.0
|
|
_listening_start_time: float = 0.0
|
|
|
|
# Packet detection (for listening mode with packet detection)
|
|
_packet_config: Optional[PacketConfig] = None
|
|
_detected_packets: list = field(default_factory=list)
|
|
_packet_buffer: bytearray = field(default_factory=bytearray)
|
|
_packet_detection_active: bool = False
|
|
_packet_start_timestamp: float = 0.0
|
|
|
|
# Timestamp function
|
|
_get_timestamp: Callable[[], float] = field(default=time.perf_counter)
|
|
|
|
def __post_init__(self):
|
|
"""Initialize timestamp source."""
|
|
if self.config.timestamp_source:
|
|
self._get_timestamp = self.config.timestamp_source
|
|
|
|
|
|
# =============================================================================
|
|
# Port Lifecycle
|
|
# =============================================================================
|
|
|
|
def uart_create(config: UARTConfig) -> Tuple[Status, Optional[UARTPort]]:
|
|
"""
|
|
Create UART port instance.
|
|
|
|
Returns:
|
|
(Status.OK, port) on success
|
|
(Status.BAD_CONFIG, None) on invalid config
|
|
"""
|
|
if not config.device or config.baudrate <= 0:
|
|
return (Status.BAD_CONFIG, None)
|
|
|
|
if not serial:
|
|
return (Status.IO_ERROR, None)
|
|
|
|
port = UARTPort(config=config)
|
|
return (Status.OK, port)
|
|
|
|
|
|
def uart_open(port: UARTPort) -> Status:
|
|
"""
|
|
Open serial port and initialize buffer.
|
|
|
|
Returns:
|
|
Status.OK on success
|
|
Status.ALREADY_OPEN if already open
|
|
Status.IO_ERROR on failure
|
|
"""
|
|
if port._is_open:
|
|
return Status.ALREADY_OPEN
|
|
|
|
try:
|
|
# Map parity
|
|
parity_map = {
|
|
'N': serial.PARITY_NONE,
|
|
'E': serial.PARITY_EVEN,
|
|
'O': serial.PARITY_ODD,
|
|
'M': serial.PARITY_MARK,
|
|
'S': serial.PARITY_SPACE
|
|
}
|
|
|
|
# Open serial port with FULL configuration
|
|
port._serial_port = serial.Serial(
|
|
port=port.config.device,
|
|
baudrate=port.config.baudrate,
|
|
bytesize=port.config.data_bits,
|
|
parity=parity_map.get(port.config.parity, serial.PARITY_NONE),
|
|
stopbits=port.config.stop_bits,
|
|
timeout=0.01, # Non-blocking with small timeout
|
|
write_timeout=0.5,
|
|
xonxoff=False, # Disable software flow control
|
|
rtscts=False, # Disable hardware (RTS/CTS) flow control
|
|
dsrdtr=False # Disable hardware (DSR/DTR) flow control
|
|
)
|
|
|
|
# Create RX buffer
|
|
status, buffer = cb_init(port.config.buffer_size)
|
|
if status != BufferStatus.OK:
|
|
port._serial_port.close()
|
|
return Status.IO_ERROR
|
|
|
|
port._rx_buffer = buffer
|
|
port._is_open = True
|
|
|
|
# Reset statistics
|
|
with port._lock:
|
|
port._total_bytes_received = 0
|
|
port._total_packets = 0
|
|
port._last_rx_timestamp = 0.0
|
|
|
|
return Status.OK
|
|
|
|
except Exception as e:
|
|
return Status.IO_ERROR
|
|
|
|
|
|
def uart_close(port: UARTPort) -> Status:
|
|
"""
|
|
Close serial port and cleanup resources.
|
|
|
|
Stops reader thread if running.
|
|
|
|
Returns:
|
|
Status.OK on success
|
|
"""
|
|
if not port._is_open:
|
|
return Status.OK
|
|
|
|
# Stop reader first
|
|
if port._reader_running:
|
|
uart_stop_reader(port)
|
|
|
|
# Close serial port
|
|
if port._serial_port:
|
|
try:
|
|
port._serial_port.close()
|
|
except:
|
|
pass
|
|
port._serial_port = None
|
|
|
|
# Cleanup buffer
|
|
port._rx_buffer = None
|
|
port._is_open = False
|
|
|
|
return Status.OK
|
|
|
|
|
|
# =============================================================================
|
|
# Reader Thread
|
|
# =============================================================================
|
|
|
|
def uart_start_reader(port: UARTPort) -> Status:
|
|
"""
|
|
Start background reader thread.
|
|
|
|
Thread reads from serial port and writes to circular buffer.
|
|
|
|
Returns:
|
|
Status.OK on success
|
|
Status.PORT_CLOSED if port not open
|
|
"""
|
|
if not port._is_open:
|
|
return Status.PORT_CLOSED
|
|
|
|
if port._reader_running:
|
|
return Status.OK # Already running
|
|
|
|
port._stop_reader_event.clear()
|
|
port._reader_running = True
|
|
|
|
port._reader_thread = threading.Thread(
|
|
target=_reader_thread_func,
|
|
args=(port,),
|
|
daemon=True,
|
|
name=f"UART-Reader-{port.config.device}"
|
|
)
|
|
port._reader_thread.start()
|
|
|
|
return Status.OK
|
|
|
|
|
|
def uart_stop_reader(port: UARTPort) -> Status:
|
|
"""
|
|
Stop background reader thread.
|
|
|
|
Blocks until thread exits (with timeout).
|
|
|
|
Returns:
|
|
Status.OK on success
|
|
"""
|
|
if not port._reader_running:
|
|
return Status.OK
|
|
|
|
port._stop_reader_event.set()
|
|
|
|
if port._reader_thread:
|
|
port._reader_thread.join(timeout=1.0)
|
|
port._reader_thread = None
|
|
|
|
port._reader_running = False
|
|
|
|
return Status.OK
|
|
|
|
|
|
def _reader_thread_func(port: UARTPort) -> None:
|
|
"""
|
|
Background reader thread implementation.
|
|
|
|
Continuously reads from serial port and writes to circular buffer.
|
|
Updates timestamps and byte counters.
|
|
|
|
If packet detection is enabled, also detects packet boundaries
|
|
and stores detected packets with timestamps.
|
|
"""
|
|
while not port._stop_reader_event.is_set():
|
|
try:
|
|
if port._serial_port and port._serial_port.in_waiting > 0:
|
|
# Read available data
|
|
chunk = port._serial_port.read(port.config.read_chunk_size)
|
|
|
|
if chunk:
|
|
# Write to circular buffer
|
|
cb_write(port._rx_buffer, chunk)
|
|
|
|
# Update statistics
|
|
timestamp = port._get_timestamp()
|
|
with port._lock:
|
|
port._total_bytes_received += len(chunk)
|
|
port._last_rx_timestamp = timestamp
|
|
|
|
# Packet detection (if enabled)
|
|
if port._packet_detection_active and port._packet_config:
|
|
_detect_packets_in_chunk(port, chunk, timestamp)
|
|
|
|
else:
|
|
# No data available, sleep briefly
|
|
time.sleep(0.001)
|
|
|
|
except Exception:
|
|
# IO error - exit thread
|
|
break
|
|
|
|
|
|
def _detect_packets_in_chunk(port: UARTPort, chunk: bytes, timestamp: float) -> None:
|
|
"""
|
|
Detect packets in received chunk.
|
|
|
|
Uses configured packet format (start marker, length, end marker).
|
|
Stores complete packets in _detected_packets list with timestamps.
|
|
|
|
Packet format: [START_MARKER][DATA][END_MARKER]
|
|
Example: EF FE [14 bytes] EE
|
|
|
|
Args:
|
|
port: UART port instance
|
|
chunk: Received data chunk
|
|
timestamp: Timestamp when chunk received
|
|
"""
|
|
if not port._packet_config or not port._packet_config.enable:
|
|
return
|
|
|
|
cfg = port._packet_config
|
|
|
|
# Add chunk to packet buffer
|
|
port._packet_buffer.extend(chunk)
|
|
|
|
# Process buffer looking for complete packets
|
|
while len(port._packet_buffer) >= (cfg.packet_length or 0):
|
|
# Look for start marker
|
|
if cfg.start_marker:
|
|
# Find start marker position
|
|
start_idx = port._packet_buffer.find(cfg.start_marker)
|
|
|
|
if start_idx == -1:
|
|
# No start marker found - clear old data, keep last few bytes
|
|
# (in case start marker is split across chunks)
|
|
if len(port._packet_buffer) > 100:
|
|
port._packet_buffer = port._packet_buffer[-10:]
|
|
break
|
|
|
|
# Remove everything before start marker
|
|
if start_idx > 0:
|
|
port._packet_buffer = port._packet_buffer[start_idx:]
|
|
|
|
# Check if we have enough bytes for complete packet
|
|
if len(port._packet_buffer) < cfg.packet_length:
|
|
break # Wait for more data
|
|
|
|
# Extract potential packet
|
|
packet_bytes = bytes(port._packet_buffer[:cfg.packet_length])
|
|
|
|
# Verify end marker (if configured)
|
|
if cfg.end_marker:
|
|
expected_end_pos = cfg.packet_length - len(cfg.end_marker)
|
|
actual_end = packet_bytes[expected_end_pos:]
|
|
|
|
if actual_end != cfg.end_marker:
|
|
# Invalid packet - discard first byte and try again
|
|
port._packet_buffer.pop(0)
|
|
continue
|
|
|
|
# Valid packet found!
|
|
# Timestamp at packet START (when we found start marker)
|
|
if port._packet_start_timestamp == 0.0:
|
|
port._packet_start_timestamp = timestamp
|
|
|
|
# Create packet info
|
|
with port._lock:
|
|
port._total_packets += 1
|
|
packet_id = port._total_packets
|
|
|
|
packet_info = PacketInfo(
|
|
packet_id=packet_id,
|
|
start_timestamp=int(port._packet_start_timestamp * 1e9), # Convert to nanoseconds
|
|
stop_timestamp=int(timestamp * 1e9), # Not used, but filled anyway
|
|
data=packet_bytes,
|
|
stop_reason="packet_complete"
|
|
)
|
|
|
|
# Store packet
|
|
port._detected_packets.append(packet_info)
|
|
|
|
# Call callback if provided (with timestamp in nanoseconds)
|
|
if cfg.on_packet_callback:
|
|
try:
|
|
cfg.on_packet_callback(packet_info.start_timestamp)
|
|
except Exception:
|
|
# Don't crash reader thread if callback fails
|
|
pass
|
|
|
|
# Remove packet from buffer
|
|
port._packet_buffer = port._packet_buffer[cfg.packet_length:]
|
|
|
|
# Reset start timestamp for next packet
|
|
port._packet_start_timestamp = 0.0
|
|
|
|
else:
|
|
# No start marker - just use fixed length
|
|
if cfg.packet_length and len(port._packet_buffer) >= cfg.packet_length:
|
|
packet_bytes = bytes(port._packet_buffer[:cfg.packet_length])
|
|
|
|
with port._lock:
|
|
port._total_packets += 1
|
|
packet_id = port._total_packets
|
|
|
|
packet_info = PacketInfo(
|
|
packet_id=packet_id,
|
|
start_timestamp=int(timestamp * 1e9),
|
|
stop_timestamp=int(timestamp * 1e9),
|
|
data=packet_bytes,
|
|
stop_reason="packet_complete"
|
|
)
|
|
|
|
port._detected_packets.append(packet_info)
|
|
|
|
# Call callback if provided (with timestamp in nanoseconds)
|
|
if cfg.on_packet_callback:
|
|
try:
|
|
cfg.on_packet_callback(packet_info.start_timestamp)
|
|
except Exception:
|
|
# Don't crash reader thread if callback fails
|
|
pass
|
|
|
|
port._packet_buffer = port._packet_buffer[cfg.packet_length:]
|
|
else:
|
|
break
|
|
|
|
|
|
# =============================================================================
|
|
# Write Operations
|
|
# =============================================================================
|
|
|
|
def uart_write(port: UARTPort, data: bytes) -> Tuple[Status, int]:
|
|
"""
|
|
Write data to serial port.
|
|
|
|
Returns:
|
|
(Status.OK, bytes_written) on success
|
|
(Status.PORT_CLOSED, 0) if port not open
|
|
(Status.IO_ERROR, 0) on write failure
|
|
"""
|
|
if not port._is_open or not port._serial_port:
|
|
return (Status.PORT_CLOSED, 0)
|
|
|
|
try:
|
|
written = port._serial_port.write(data)
|
|
port._serial_port.flush()
|
|
return (Status.OK, written)
|
|
|
|
except Exception:
|
|
return (Status.IO_ERROR, 0)
|
|
|
|
|
|
# =============================================================================
|
|
# Packet Detection (Request-Response Mode)
|
|
# =============================================================================
|
|
|
|
def uart_send_and_receive(port: UARTPort, tx_data: bytes,
|
|
timeout_ms: Optional[int] = None) -> Tuple[Status, Optional[PacketInfo]]:
|
|
"""
|
|
Send data and wait for response packet (request-response mode).
|
|
|
|
Uses configured stop condition (timeout or terminator).
|
|
Timeout starts immediately after send.
|
|
|
|
Args:
|
|
port: UART port instance
|
|
tx_data: Data to transmit
|
|
timeout_ms: Override configured timeout (optional)
|
|
|
|
Returns:
|
|
(Status.OK, PacketInfo) on success
|
|
(Status.TIMEOUT, None) on timeout
|
|
(Status.PORT_CLOSED, None) if port not ready
|
|
"""
|
|
if not port._is_open or not port._reader_running:
|
|
return (Status.PORT_CLOSED, None)
|
|
|
|
# Use configured timeout or override
|
|
timeout = timeout_ms if timeout_ms is not None else port.config.stop_timeout_ms
|
|
|
|
# Snapshot buffer position before send
|
|
start_w = cb_w_abs(port._rx_buffer)
|
|
|
|
# Send data
|
|
status, _ = uart_write(port, tx_data)
|
|
if status != Status.OK:
|
|
return (status, None)
|
|
|
|
# Start timestamp
|
|
start_time = port._get_timestamp()
|
|
timeout_s = timeout / 1000.0
|
|
|
|
# Wait for response
|
|
return _wait_for_packet(
|
|
port=port,
|
|
start_w=start_w,
|
|
start_time=start_time,
|
|
timeout_s=timeout_s,
|
|
grace_period=False
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Listening Mode (Continuous, No Auto-Stop)
|
|
# =============================================================================
|
|
|
|
def uart_start_listening(port: UARTPort) -> Status:
|
|
"""
|
|
Start listening mode - continuous data collection with no stop condition.
|
|
|
|
Data fills buffer continuously. Use uart_read_buffer() to get current contents.
|
|
Call uart_stop_listening() to stop.
|
|
|
|
Returns:
|
|
Status.OK on success
|
|
Status.PORT_CLOSED if port not ready
|
|
"""
|
|
if not port._is_open or not port._reader_running:
|
|
return Status.PORT_CLOSED
|
|
|
|
# Listening mode has no special state - just reader running
|
|
# Mark listening start time for potential later use
|
|
with port._lock:
|
|
port._listening_start_time = port._get_timestamp()
|
|
|
|
return Status.OK
|
|
|
|
|
|
def uart_stop_listening(port: UARTPort) -> Status:
|
|
"""
|
|
Stop listening mode.
|
|
|
|
Returns:
|
|
Status.OK
|
|
"""
|
|
# Nothing special to do - reader keeps running
|
|
# Just mark end time
|
|
with port._lock:
|
|
port._listening_start_time = 0.0
|
|
|
|
# Stop packet detection
|
|
port._packet_detection_active = False
|
|
|
|
return Status.OK
|
|
|
|
|
|
def uart_start_listening_with_packets(port: UARTPort, packet_config: PacketConfig) -> Status:
|
|
"""
|
|
Start listening mode WITH packet detection.
|
|
|
|
Reader thread will:
|
|
- Fill circular buffer (continuous logging)
|
|
- Detect packet boundaries based on packet_config
|
|
- Store each detected packet with timestamp and count
|
|
- Call optional callback when packet detected (real-time trigger)
|
|
|
|
Args:
|
|
port: UART port instance
|
|
packet_config: Packet detection configuration
|
|
|
|
Returns:
|
|
Status.OK on success
|
|
Status.PORT_CLOSED if port not ready
|
|
|
|
Example without callback:
|
|
# For packet format: EF FE [14 bytes] EE (17 bytes total)
|
|
packet_config = PacketConfig(
|
|
enable=True,
|
|
start_marker=b'\\xEF\\xFE',
|
|
packet_length=17,
|
|
end_marker=b'\\xEE'
|
|
)
|
|
uart_start_listening_with_packets(port, packet_config)
|
|
|
|
Example with callback (real-time trigger):
|
|
def on_packet_detected(timestamp_ns: int):
|
|
'''Called immediately when packet detected.'''
|
|
# Trigger I2C read or other action
|
|
i2c_read_angle(i2c_port)
|
|
|
|
packet_config = PacketConfig(
|
|
enable=True,
|
|
start_marker=b'\\xEF\\xFE',
|
|
packet_length=17,
|
|
end_marker=b'\\xEE',
|
|
on_packet_callback=on_packet_detected
|
|
)
|
|
uart_start_listening_with_packets(port, packet_config)
|
|
"""
|
|
if not port._is_open or not port._reader_running:
|
|
return Status.PORT_CLOSED
|
|
|
|
# Configure packet detection
|
|
port._packet_config = packet_config
|
|
port._packet_buffer.clear()
|
|
port._packet_start_timestamp = 0.0
|
|
|
|
# Mark listening start time
|
|
with port._lock:
|
|
port._listening_start_time = port._get_timestamp()
|
|
|
|
# Enable packet detection in reader thread
|
|
port._packet_detection_active = packet_config.enable
|
|
|
|
return Status.OK
|
|
|
|
|
|
def uart_get_detected_packets(port: UARTPort) -> list:
|
|
"""
|
|
Get all packets detected since listening started.
|
|
|
|
Returns list of PacketInfo objects, each containing:
|
|
- packet_id: Sequential packet number (1, 2, 3, ...)
|
|
- start_timestamp: When packet started (nanoseconds since epoch)
|
|
- data: Raw packet bytes
|
|
|
|
Returns:
|
|
List of PacketInfo objects
|
|
|
|
Example:
|
|
packets = uart_get_detected_packets(port)
|
|
print(f"Detected {len(packets)} packets")
|
|
for pkt in packets:
|
|
print(f"Packet {pkt.packet_id} at {pkt.start_timestamp}ns")
|
|
"""
|
|
return port._detected_packets.copy()
|
|
|
|
|
|
def uart_clear_detected_packets(port: UARTPort) -> Status:
|
|
"""
|
|
Clear detected packets list.
|
|
|
|
Call at start of each RUN to reset packet counter and list.
|
|
|
|
Returns:
|
|
Status.OK
|
|
"""
|
|
port._detected_packets.clear()
|
|
port._packet_buffer.clear()
|
|
port._packet_start_timestamp = 0.0
|
|
|
|
with port._lock:
|
|
port._total_packets = 0
|
|
|
|
return Status.OK
|
|
|
|
|
|
def uart_read_buffer(port: UARTPort, max_bytes: int = 0) -> Tuple[Status, bytes]:
|
|
"""
|
|
Read current buffer contents (for listening mode).
|
|
|
|
Args:
|
|
port: UART port
|
|
max_bytes: Max bytes to read (0 = all available, or last N bytes if buffer larger)
|
|
|
|
Returns:
|
|
(Status.OK, data) on success
|
|
(Status.PORT_CLOSED, b"") if port not ready
|
|
"""
|
|
if not port._is_open or not port._rx_buffer:
|
|
return (Status.PORT_CLOSED, b"")
|
|
|
|
current_w = cb_w_abs(port._rx_buffer)
|
|
|
|
if current_w == 0:
|
|
return (Status.OK, b"")
|
|
|
|
if max_bytes > 0:
|
|
# Read last N bytes
|
|
start_w = max(0, current_w - max_bytes)
|
|
else:
|
|
# Read all available (up to buffer capacity)
|
|
capacity = cb_capacity(port._rx_buffer)
|
|
start_w = max(0, current_w - capacity)
|
|
|
|
status, data = cb_copy_span(port._rx_buffer, start_w, current_w)
|
|
|
|
if status != BufferStatus.OK:
|
|
return (Status.IO_ERROR, b"")
|
|
|
|
return (Status.OK, data)
|
|
|
|
|
|
# =============================================================================
|
|
# Packet Detection (Polling Mode)
|
|
# =============================================================================
|
|
|
|
def uart_poll_packet(port: UARTPort) -> Tuple[Status, Optional[PacketInfo]]:
|
|
"""
|
|
Poll for next packet (polling mode).
|
|
|
|
Grace period: Waits for first byte, then timeout starts.
|
|
|
|
Flow:
|
|
1. Wait up to grace_timeout_ms for first byte
|
|
2. Once first byte arrives, start stop_timeout_ms
|
|
3. If grace expires without byte, start timeout anyway
|
|
4. Detect stop condition (timeout or terminator)
|
|
|
|
Returns:
|
|
(Status.OK, PacketInfo) on success
|
|
(Status.TIMEOUT_NO_DATA, None) if no data after grace + timeout
|
|
(Status.PORT_CLOSED, None) if port not ready
|
|
"""
|
|
if not port._is_open or not port._reader_running:
|
|
return (Status.PORT_CLOSED, None)
|
|
|
|
if not port.config.polling_mode:
|
|
# Not configured for polling - use regular timeout
|
|
start_w = cb_w_abs(port._rx_buffer)
|
|
start_time = port._get_timestamp()
|
|
timeout_s = port.config.stop_timeout_ms / 1000.0
|
|
|
|
return _wait_for_packet(
|
|
port=port,
|
|
start_w=start_w,
|
|
start_time=start_time,
|
|
timeout_s=timeout_s,
|
|
grace_period=False
|
|
)
|
|
|
|
# Polling mode with grace period
|
|
start_w = cb_w_abs(port._rx_buffer)
|
|
start_time = port._get_timestamp()
|
|
grace_s = port.config.grace_timeout_ms / 1000.0
|
|
timeout_s = port.config.stop_timeout_ms / 1000.0
|
|
|
|
# Phase 1: Grace period - wait for first byte
|
|
first_byte_seen = False
|
|
grace_start = start_time
|
|
|
|
while (port._get_timestamp() - grace_start) < grace_s:
|
|
current_w = cb_w_abs(port._rx_buffer)
|
|
if current_w > start_w:
|
|
# First byte arrived!
|
|
first_byte_seen = True
|
|
break
|
|
time.sleep(0.001)
|
|
|
|
# Phase 2: Timeout starts (whether or not byte arrived)
|
|
return _wait_for_packet(
|
|
port=port,
|
|
start_w=start_w,
|
|
start_time=port._get_timestamp(), # Reset start time
|
|
timeout_s=timeout_s,
|
|
grace_period=False,
|
|
grace_expired_no_data=(not first_byte_seen)
|
|
)
|
|
|
|
|
|
def _wait_for_packet(port: UARTPort, start_w: int, start_time: float,
|
|
timeout_s: float, grace_period: bool,
|
|
grace_expired_no_data: bool = False) -> Tuple[Status, Optional[PacketInfo]]:
|
|
"""
|
|
Internal: Wait for stop condition and collect packet.
|
|
|
|
Args:
|
|
port: UART port
|
|
start_w: Buffer write position at start
|
|
start_time: Start timestamp
|
|
timeout_s: Timeout in seconds
|
|
grace_period: If True, timeout starts after first byte
|
|
grace_expired_no_data: Grace expired without any byte
|
|
|
|
Returns:
|
|
(Status, PacketInfo or None)
|
|
"""
|
|
mode = port.config.stop_mode
|
|
first_byte_seen = False
|
|
first_byte_time = 0.0
|
|
last_rx_time = start_time
|
|
|
|
while True:
|
|
now = port._get_timestamp()
|
|
current_w = cb_w_abs(port._rx_buffer)
|
|
|
|
# Check for new data
|
|
if current_w > start_w:
|
|
if not first_byte_seen:
|
|
first_byte_seen = True
|
|
first_byte_time = now
|
|
|
|
# Update last RX time from reader thread
|
|
with port._lock:
|
|
last_rx_time = port._last_rx_timestamp
|
|
|
|
# Grace period: wait for first byte before starting timeout
|
|
if grace_period and not first_byte_seen:
|
|
if (now - start_time) >= timeout_s:
|
|
# Grace expired, no data
|
|
if grace_expired_no_data:
|
|
return (Status.TIMEOUT_NO_DATA, None)
|
|
# Start regular timeout now
|
|
grace_period = False
|
|
start_time = now
|
|
time.sleep(0.001)
|
|
continue
|
|
|
|
# Timeout mode: silence timeout
|
|
if mode == StopConditionMode.TIMEOUT:
|
|
if first_byte_seen:
|
|
silence = now - last_rx_time
|
|
if silence >= (timeout_s):
|
|
# Stop condition: timeout met
|
|
status, data = cb_copy_span(port._rx_buffer, start_w, current_w)
|
|
if status != BufferStatus.OK:
|
|
return (Status.IO_ERROR, None)
|
|
|
|
return _create_packet_info(
|
|
port=port,
|
|
data=data,
|
|
start_time=first_byte_time,
|
|
stop_time=now,
|
|
stop_reason="timeout"
|
|
)
|
|
else:
|
|
# No byte yet, check absolute timeout
|
|
if (now - start_time) >= timeout_s:
|
|
return (Status.TIMEOUT_NO_DATA, None)
|
|
|
|
# Terminator mode: look for specific byte
|
|
elif mode == StopConditionMode.TERMINATOR:
|
|
if current_w > start_w:
|
|
# Read current data
|
|
status, data = cb_copy_span(port._rx_buffer, start_w, current_w)
|
|
if status != BufferStatus.OK:
|
|
return (Status.IO_ERROR, None)
|
|
|
|
# Check for terminator
|
|
if port.config.stop_terminator in data:
|
|
return _create_packet_info(
|
|
port=port,
|
|
data=data,
|
|
start_time=first_byte_time if first_byte_seen else start_time,
|
|
stop_time=now,
|
|
stop_reason="terminator"
|
|
)
|
|
|
|
# Absolute timeout fallback
|
|
if (now - start_time) >= timeout_s:
|
|
return (Status.TIMEOUT, None)
|
|
|
|
time.sleep(0.001)
|
|
|
|
|
|
def _create_packet_info(port: UARTPort, data: bytes, start_time: float,
|
|
stop_time: float, stop_reason: str) -> Tuple[Status, PacketInfo]:
|
|
"""Create packet info and increment counter."""
|
|
with port._lock:
|
|
port._total_packets += 1
|
|
packet_id = port._total_packets
|
|
|
|
packet = PacketInfo(
|
|
packet_id=packet_id,
|
|
start_timestamp=start_time,
|
|
stop_timestamp=stop_time,
|
|
data=data,
|
|
stop_reason=stop_reason
|
|
)
|
|
|
|
return (Status.OK, packet)
|
|
|
|
|
|
# =============================================================================
|
|
# Status Query
|
|
# =============================================================================
|
|
|
|
def uart_get_status(port: UARTPort) -> PortStatus:
|
|
"""
|
|
Get current port status.
|
|
|
|
Returns:
|
|
PortStatus with current metrics
|
|
"""
|
|
if not port._rx_buffer:
|
|
return PortStatus(
|
|
is_open=port._is_open,
|
|
is_reader_running=port._reader_running,
|
|
buffer_capacity=0,
|
|
buffer_fill_bytes=0,
|
|
buffer_fill_percent=0,
|
|
buffer_overflows=0,
|
|
total_bytes_received=0,
|
|
total_packets=0
|
|
)
|
|
|
|
with port._lock:
|
|
total_bytes = port._total_bytes_received
|
|
total_packets = port._total_packets
|
|
|
|
return PortStatus(
|
|
is_open=port._is_open,
|
|
is_reader_running=port._reader_running,
|
|
buffer_capacity=cb_capacity(port._rx_buffer),
|
|
buffer_fill_bytes=cb_fill_bytes(port._rx_buffer),
|
|
buffer_fill_percent=cb_fill_pct(port._rx_buffer),
|
|
buffer_overflows=cb_overflows(port._rx_buffer),
|
|
total_bytes_received=total_bytes,
|
|
total_packets=total_packets
|
|
)
|