You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
217 lines
6.1 KiB
217 lines
6.1 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Kalman Filter for Angular Position/Velocity/Acceleration
|
|
=========================================================
|
|
1D Kalman filter for tracking door angle from noisy encoder readings.
|
|
|
|
State vector: [angle, velocity, acceleration]
|
|
Measurement: angle (from 14-bit I2C encoder)
|
|
|
|
Author: Kynsight
|
|
Version: 1.0.0
|
|
"""
|
|
|
|
import numpy as np
|
|
from typing import Tuple, Optional
|
|
|
|
|
|
class AngleKalmanFilter:
|
|
"""
|
|
Kalman filter for angular position tracking.
|
|
|
|
Tracks angle (deg), angular velocity (deg/s), and angular acceleration (deg/s²).
|
|
Designed for 14-bit encoder with 0.022° quantization noise.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
process_noise: float = 0.1,
|
|
measurement_noise: float = 0.022,
|
|
initial_angle: float = 0.0
|
|
):
|
|
"""
|
|
Initialize Kalman filter.
|
|
|
|
Args:
|
|
process_noise: Process noise covariance (system uncertainty)
|
|
measurement_noise: Measurement noise (encoder quantization = 0.022°)
|
|
initial_angle: Initial angle estimate (degrees)
|
|
"""
|
|
# State vector: [angle, velocity, acceleration]
|
|
self.x = np.array([initial_angle, 0.0, 0.0])
|
|
|
|
# State covariance matrix (uncertainty in estimates)
|
|
self.P = np.eye(3) * 1.0
|
|
|
|
# Process noise covariance
|
|
self.Q = np.array([
|
|
[process_noise, 0, 0],
|
|
[0, process_noise * 10, 0], # Velocity more uncertain
|
|
[0, 0, process_noise * 100] # Acceleration even more uncertain
|
|
])
|
|
|
|
# Measurement noise covariance (14-bit encoder = 360/16384 = 0.022°)
|
|
self.R = np.array([[measurement_noise ** 2]])
|
|
|
|
# Measurement matrix (we only measure angle)
|
|
self.H = np.array([[1.0, 0.0, 0.0]])
|
|
|
|
# Previous timestamp for dt calculation
|
|
self.prev_time_ns: Optional[int] = None
|
|
|
|
# Filter initialized flag
|
|
self.initialized = False
|
|
|
|
def predict(self, dt: float):
|
|
"""
|
|
Predict step: propagate state forward in time.
|
|
|
|
State transition model:
|
|
angle(k+1) = angle(k) + velocity(k)*dt + 0.5*accel(k)*dt²
|
|
velocity(k+1) = velocity(k) + accel(k)*dt
|
|
accel(k+1) = accel(k) (constant acceleration model)
|
|
|
|
Args:
|
|
dt: Time step in seconds
|
|
"""
|
|
# State transition matrix
|
|
F = np.array([
|
|
[1.0, dt, 0.5 * dt**2],
|
|
[0.0, 1.0, dt],
|
|
[0.0, 0.0, 1.0]
|
|
])
|
|
|
|
# Predict state
|
|
self.x = F @ self.x
|
|
|
|
# Predict covariance
|
|
self.P = F @ self.P @ F.T + self.Q
|
|
|
|
def update(self, measurement: float):
|
|
"""
|
|
Update step: correct prediction with measurement.
|
|
|
|
Args:
|
|
measurement: Measured angle in degrees
|
|
"""
|
|
# Measurement residual
|
|
z = np.array([[measurement]])
|
|
y = z - self.H @ self.x.reshape(-1, 1)
|
|
|
|
# Residual covariance
|
|
S = self.H @ self.P @ self.H.T + self.R
|
|
|
|
# Kalman gain
|
|
K = self.P @ self.H.T @ np.linalg.inv(S)
|
|
|
|
# Update state
|
|
self.x = self.x + (K @ y).flatten()
|
|
|
|
# Update covariance
|
|
I = np.eye(3)
|
|
self.P = (I - K @ self.H) @ self.P
|
|
|
|
def process(
|
|
self,
|
|
angle_measurement: float,
|
|
timestamp_ns: int
|
|
) -> Tuple[float, float, float]:
|
|
"""
|
|
Process one measurement (predict + update).
|
|
|
|
Args:
|
|
angle_measurement: Raw angle measurement (degrees)
|
|
timestamp_ns: Timestamp in nanoseconds
|
|
|
|
Returns:
|
|
(filtered_angle, filtered_velocity, filtered_acceleration)
|
|
"""
|
|
if not self.initialized:
|
|
# First measurement - initialize state
|
|
self.x[0] = angle_measurement
|
|
self.x[1] = 0.0
|
|
self.x[2] = 0.0
|
|
self.prev_time_ns = timestamp_ns
|
|
self.initialized = True
|
|
return (angle_measurement, 0.0, 0.0)
|
|
|
|
# Calculate time step
|
|
dt = (timestamp_ns - self.prev_time_ns) / 1_000_000_000.0
|
|
self.prev_time_ns = timestamp_ns
|
|
|
|
if dt <= 0:
|
|
# Duplicate timestamp - return previous estimate
|
|
return (self.x[0], self.x[1], self.x[2])
|
|
|
|
# Predict
|
|
self.predict(dt)
|
|
|
|
# Update with measurement
|
|
self.update(angle_measurement)
|
|
|
|
return (self.x[0], self.x[1], self.x[2])
|
|
|
|
def reset(self, angle: float = 0.0):
|
|
"""
|
|
Reset filter to initial state.
|
|
|
|
Args:
|
|
angle: Initial angle estimate
|
|
"""
|
|
self.x = np.array([angle, 0.0, 0.0])
|
|
self.P = np.eye(3) * 1.0
|
|
self.prev_time_ns = None
|
|
self.initialized = False
|
|
|
|
def get_state(self) -> Tuple[float, float, float]:
|
|
"""
|
|
Get current state estimate.
|
|
|
|
Returns:
|
|
(angle, velocity, acceleration)
|
|
"""
|
|
return (self.x[0], self.x[1], self.x[2])
|
|
|
|
|
|
# =============================================================================
|
|
# Demo
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
print("Kalman Filter Demo")
|
|
print("=" * 60)
|
|
|
|
# Create filter
|
|
kf = AngleKalmanFilter(
|
|
process_noise=0.1,
|
|
measurement_noise=0.022, # 14-bit encoder quantization
|
|
initial_angle=-84.0
|
|
)
|
|
|
|
# Simulate noisy measurements
|
|
import random
|
|
|
|
print("\nSimulating noisy angle measurements:")
|
|
print(f"{'Time (ms)':<12} {'Raw Angle':<12} {'Filtered':<12} {'Velocity':<12} {'Accel':<12}")
|
|
print("-" * 60)
|
|
|
|
true_angle = -84.0
|
|
time_ns = 0
|
|
|
|
for i in range(20):
|
|
time_ns += 35_000_000 # ~35ms intervals (typical packet rate)
|
|
|
|
# Add quantization noise (0.022° steps)
|
|
noise = random.choice([-0.022, 0, 0.022])
|
|
measurement = true_angle + noise
|
|
|
|
# Process measurement
|
|
angle, vel, accel = kf.process(measurement, time_ns)
|
|
|
|
print(f"{time_ns/1e6:<12.1f} {measurement:<12.4f} {angle:<12.4f} {vel:<12.4f} {accel:<12.4f}")
|
|
|
|
# Slowly increase angle (simulate door opening)
|
|
true_angle += 0.02
|
|
|
|
print("\n✓ Filter smooths quantization noise while tracking motion")
|