# ----------------------- # File: components/uart/packet_detector.py # # Purpose # ------- # Tiny, fast packet sniffer for fixed-format frames in a byte stream. # Detects packets *in real time* as bytes arrive, without conversions. # # Default packet shape # -------------------- # - Start: 0xEF 0xFE # - Total length: 14 bytes # - Last byte must be: 0xEE # # When a packet is detected, we invoke an optional callback: # on_packet(ts_ns: int, packet: bytes, abs_off_start: int) -> None # where `ts_ns` is the detection/completion timestamp (now_ns at the moment # we saw the last byte), and `abs_off_start` is the absolute offset of the # first packet byte within the upstream ring buffer (if provided by caller). # # Notes # ----- # - The detector keeps a tiny internal buffer (max 14 bytes) and a simple FSM. # - It is transport-agnostic: you feed() bytes from any source (UART, I2C, ...). # - "Absolute offset" is optional. If you pass a base offset for each chunk, # the detector computes the packet's start offset precisely. # from __future__ import annotations from typing import Optional, Callable class PacketDetector: """Realtime detector for fixed 14-byte packets starting with EF FE and ending with EE. Usage ----- det = PacketDetector(on_packet=my_handler) det.feed(data, t_ns, abs_off_start) Where `abs_off_start` is the absolute ring offset of `data[0]`. """ __slots__ = ( "_on_packet", "_buf", "_count", "_searching", "_have_ef", "_pkt_len", "_start0", "_start1", "_end_byte", "_pending_first_abs_off", "_pending_first_ts_ns", ) def __init__( self, on_packet: Optional[Callable[[int, bytes, int], None]] = None, *, start0: int = 0xEF, start1: int = 0xFE, end_byte: int = 0xEE, length: int = 14, ) -> None: self._on_packet = on_packet self._buf = bytearray() self._count = 0 self._searching = True self._have_ef = False # helper to detect EF FE efficiently self._pkt_len = int(length) self._start0 = int(start0) & 0xFF self._start1 = int(start1) & 0xFF self._end_byte = int(end_byte) & 0xFF self._pending_first_abs_off = 0 self._pending_first_ts_ns = 0 # --------------------------- # Configuration # --------------------------- def set_handler(self, fn: Optional[Callable[[int, bytes, int], None]]): """Register or clear the on_packet callback.""" self._on_packet = fn # --------------------------- # Core detection # --------------------------- def feed(self, data: bytes, t_ns: int, abs_off_start: int = 0) -> None: """Consume a chunk of bytes and emit packets via callback when found. Parameters ---------- data : bytes Newly received bytes. t_ns : int Timestamp for *this chunk* (e.g., when the first byte was read). We use this for the first-byte time if desired; for simplicity the emitted event uses the completion time (call site passes a fresh now). abs_off_start : int Absolute offset of data[0] within the upstream ring buffer. Used to compute the absolute start offset of detected packets. """ if not data: return # Fast path: work byte-by-byte, minimal branching for i, b in enumerate(data): bi = b & 0xFF if self._searching: # Find EF FE start sequence without growing the buffer if not self._have_ef: self._have_ef = bi == self._start0 continue else: if bi == self._start1: # Start found: reset buffer to EF FE self._buf.clear() self._buf.append(self._start0) self._buf.append(self._start1) self._count = 2 self._searching = False self._pending_first_abs_off = ( abs_off_start + i - 1 ) # index of EF self._pending_first_ts_ns = t_ns # Regardless, reset EF tracker (we only accept EF FE) self._have_ef = bi == self._start0 continue # Accumulating a candidate packet self._buf.append(bi) self._count += 1 if self._count < self._pkt_len: continue # We have exactly `length` bytes; check end marker if self._buf[-1] == self._end_byte: # Detected a full packet → emit if self._on_packet is not None: # Completion timestamp should be the call-site's fresh now. # We re-use t_ns here; callers should pass a fresh now when feeding. self._on_packet(t_ns, bytes(self._buf), self._pending_first_abs_off) # Reset to search for the next packet, even if the tail byte was wrong self._buf.clear() self._count = 0 self._searching = True self._have_ef = False self._pending_first_abs_off = 0 self._pending_first_ts_ns = 0 def reset(self) -> None: """Clear detector state (used on reconnects, etc.).""" self._buf.clear() self._count = 0 self._searching = True self._have_ef = False self._pending_first_abs_off = 0 self._pending_first_ts_ns = 0