You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
109 lines
3.4 KiB
109 lines
3.4 KiB
from minibase.blueprints.sensor.models import nbiotDevice, nbiotDeviceStatus, nbiotDeviceType, nbiotDeviceArea
|
|
import minibase.blueprints.database.utils as dbUtils
|
|
from sqlalchemy import case
|
|
import base64
|
|
|
|
def queryImageById(id):
|
|
selected = nbiotDevice.query.filter_by(id=id).first()
|
|
return selected.image_file
|
|
|
|
|
|
def queryById(id):
|
|
return nbiotDevice.query.filter_by(id=id).first()
|
|
|
|
|
|
def queryByImsi(id):
|
|
return nbiotDevice.query.filter_by(imsi=id).first()
|
|
|
|
|
|
def queryStatusNamesWithDefault(defId):
|
|
choices = dbUtils.queryNameWithDefaultId(nbiotDeviceStatus, defId)
|
|
return choices
|
|
|
|
|
|
def queryTypeNamesWithDefault(defId):
|
|
choices = dbUtils.queryNameWithDefaultId(nbiotDeviceType, defId)
|
|
return choices
|
|
|
|
|
|
def queryAreaNamesWithDefault(defId):
|
|
choices = dbUtils.queryNameWithDefaultId(nbiotDeviceArea, defId)
|
|
return choices
|
|
|
|
|
|
def decode_payload(payload_string):
|
|
# Ensure the payload is the correct length
|
|
payload = base64.b64decode(payload_string).hex()
|
|
if len(payload) != 16:
|
|
print(f"Payload length must be 16 characters (8 bytes) current is : {len(payload)}")
|
|
raise ValueError("Payload length must be 16 characters (8 bytes)")
|
|
|
|
# Split the payload into bytes
|
|
msb_id = payload[0:2]
|
|
second_byte_id = payload[2:4]
|
|
third_byte_id = payload[4:6]
|
|
lsb_id = payload[6:8]
|
|
battery_voltage = payload[8:10]
|
|
signal_quality = payload[10:12]
|
|
input_byte = payload[12:14]
|
|
alarm_byte = payload[14:16]
|
|
|
|
# Convert hex strings to integers
|
|
msb_id = int(msb_id, 16)
|
|
second_byte_id = int(second_byte_id, 16)
|
|
third_byte_id = int(third_byte_id, 16)
|
|
lsb_id = int(lsb_id, 16)
|
|
battery_voltage = int(battery_voltage, 16)
|
|
signal_quality = int(signal_quality, 16)
|
|
input_byte = int(input_byte, 16)
|
|
alarm_byte = int(alarm_byte, 16)
|
|
|
|
# Calculate voltage in mV
|
|
voltage_mv = battery_voltage * 30
|
|
|
|
# Determine signal quality or RSSI
|
|
if signal_quality == 99:
|
|
signal_str = "Signal quality not retrieved"
|
|
rssi = None
|
|
else:
|
|
rssi = signal_quality - 110
|
|
signal_str = f"Signal Quality: {signal_quality} (RSSI: {rssi} dB)"
|
|
|
|
# Decode pin status
|
|
pins_status = {
|
|
'IN1': (input_byte & 0b00001) != 0,
|
|
'IN2': (input_byte & 0b00010) != 0,
|
|
'IN3': (input_byte & 0b00100) != 0,
|
|
'IN4': (input_byte & 0b01000) != 0,
|
|
'IN5': (input_byte & 0b10000) != 0,
|
|
}
|
|
|
|
# Decode alarm status
|
|
alarms_status = {
|
|
'IN1': (alarm_byte & 0b00001) != 0,
|
|
'IN2': (alarm_byte & 0b00010) != 0,
|
|
'IN3': (alarm_byte & 0b00100) != 0,
|
|
'IN4': (alarm_byte & 0b01000) != 0,
|
|
'IN5': (alarm_byte & 0b10000) != 0,
|
|
}
|
|
|
|
# Print decoded values
|
|
uid = str(f'{msb_id:02x}{second_byte_id:02x}{third_byte_id:02x}{lsb_id:02x}')
|
|
print(f"UID: {uid}")
|
|
print(f"Battery Voltage: {battery_voltage:#04x} (Voltage: {voltage_mv/1000} V)")
|
|
for pin in range(1, 6):
|
|
pin_status = 'High' if pins_status[f'IN{pin}'] else 'Low'
|
|
alarm_status = 'Alarm' if alarms_status[f'IN{pin}'] else 'No Alarm'
|
|
print(f"Pin IN{pin} Status: {pin_status}, {alarm_status}")
|
|
print(signal_str)
|
|
|
|
# Return decoded values in a dictionary
|
|
return {
|
|
"uid" : uid,
|
|
"battery_voltage": voltage_mv,
|
|
"signal_quality": signal_quality,
|
|
"rssi": rssi,
|
|
"pins_status": pins_status,
|
|
"alarms_status": alarms_status
|
|
}
|