Files
Python_Decoding/Serial_decoder.py

186 lines
5.9 KiB
Python

import serial
from dataclasses import dataclass
import threading
from dataclasses import dataclass, field
from typing import List
from datetime import datetime
import queue
import re
PORT = "COM7"
SerialPort = serial.Serial(port=PORT,baudrate=250000)
input_queue = queue.SimpleQueue()
output_queue = queue.SimpleQueue()
@dataclass
class ReturnDecoder:
name: str
text_respons: str = ""
ADC_data: List[int] = field(default_factory=lambda: [])
timestamp: int = 0
log_message: str = ""
bytes_read: int = 0
#length is 3*16+4+1
def ADC_FULLRANK(buffer: bytes) -> ReturnDecoder:
if len(buffer)<53 : return ReturnDecoder("EMPTY")
return_var = ReturnDecoder("ADC_FULLRANK")
for i in range(16):
return_var.ADC_data.append(int.from_bytes(bytes=buffer[3*i:3+3*i],byteorder="big",signed=True))
return_var.timestamp = int.from_bytes(bytes=buffer[48:52],byteorder="big",signed=False)
if buffer[52] != 0xFA:
return_var.name = f"FAULT_DECODER_END"
return_var.bytes_read = 1
print(buffer.hex(sep=" "))
return_var.bytes_read = 55
return return_var
LOG_MESSAGE_ENUM_LIST = []
with open("SI_LOG_MESSAGES.txt") as file:
for message in file:
LOG_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message))
def SI_DECODER(buffer: bytes) -> ReturnDecoder:
if(len(buffer)<7): return ReturnDecoder("EMPTY")
return_var = ReturnDecoder("SI_DECODER")
LogNumber = int.from_bytes(bytes=buffer[0:2],byteorder="big",signed=False)
try:
return_var.log_message = LOG_MESSAGE_ENUM_LIST[LogNumber]
except:
return_var.name = "WRONG_SI_NUMBER"
return_var.log_message = LogNumber
return_var.bytes_read = 1
return return_var
return_var.timestamp = int.from_bytes(bytes=buffer[2:6],byteorder="big",signed=False)
if buffer[6] != 0xFA:
return_var.bytes_read = 1
return_var.name = f"FAULT_SI_DECODER_END"
print(buffer.hex(sep=" "))
return_var.bytes_read = 7
return return_var
AF_DECODERS = {
0x01: ADC_FULLRANK,
0x02: SI_DECODER
}
def AFDecoder(buffer: bytes) -> ReturnDecoder:
if len(buffer) < 2: return ReturnDecoder("EMPTY")
ret = ReturnDecoder("FAULT_DECODER")
ret.bytes_read = 1
if buffer[1] in AF_DECODERS:
try:
return AF_DECODERS[buffer[1]](buffer[2:])
except:
return ret
else:
print("unkown message")
return ret
def PRDecoder(buffer: bytes) -> ReturnDecoder:
if buffer.find(b"\n") < 0:
return ReturnDecoder("EMPTY")
temp_str = buffer.decode("ascii",errors="replace")
ret = ReturnDecoder("PR_DECODER")
ret.text_respons = temp_str.split(sep='\n',maxsplit=2)[0]
ret.bytes_read = len(ret.text_respons)
return ret
def input_loop():
while True:
input_str = input()+'\n\r'
if input_str.find("exit") != -1:
return
input_queue.put(input_str)
def output_loop():
ret_DEC : ReturnDecoder = ReturnDecoder("EMPTY")
buffer : bytes = bytes(b"")
SerialPort.timeout = 0.1
input_str: str = ""
ret_DEC.name = "EMPTY"
ret_DEC.bytes_read = 0
while True:
try:
input_str = input_queue.get_nowait()
print(f"SD: sending {input_str}")
SerialPort.write(input_str.encode("ascii"))
except:
pass
buffer = buffer + SerialPort.read_all()
AF_index = buffer.find(0xAF)
PR_index = buffer.find(b"PR")
if AF_index >= 0 and PR_index >= 0:
if AF_index < PR_index:
#if(AF_index): print(buffer[:AF_index].decode("ascii","replace"))
buffer = buffer[AF_index:]
ret_DEC = AFDecoder(buffer)
else:
#if(PR_index): print(buffer[:PR_index].decode("ascii","replace"))
buffer = buffer[PR_index:]
ret_DEC = PRDecoder(buffer)
elif AF_index >= 0:
#if(AF_index): print(buffer[:AF_index].decode("ascii","replace"))
buffer = buffer[AF_index:]
ret_DEC = AFDecoder(buffer)
elif PR_index >= 0:
#if(PR_index): print(buffer[:PR_index].decode("ascii","replace"))
buffer = buffer[PR_index:]
ret_DEC = PRDecoder(buffer)
if ret_DEC.bytes_read != 0:
buffer = buffer[ret_DEC.bytes_read:]
if ret_DEC.name != "EMPTY":
output_queue.put(ret_DEC)
ret_DEC = ReturnDecoder("EMPTY")
def GetReturn(timeout: float) -> ReturnDecoder | None:
try:
return output_queue.get(timeout=timeout)
except queue.Empty:
return None
def SendCommand(input_str: str):
input_queue.put(input_str)
def StartInputOutput() -> tuple[threading.Thread, threading.Thread]:
t1 = threading.Thread(target=output_loop,daemon=True)
t2 = threading.Thread(target=input_loop, daemon=True)
t1.start()
t2.start()
return (t1,t2)
def StartOutput() -> threading.Thread:
t1 = threading.Thread(target=output_loop,daemon=True)
t1.start()
return t1
if __name__ == "__main__":
t1 = threading.Thread(target=output_loop,daemon=True)
t2 = threading.Thread(target=input_loop)
t1.start()
t2.start()
while True:
value = GetReturn(0.5)
if t2.is_alive() == False:
break
if value == None:
continue
match value.name:
case "ADC_FULLRANK":
print(f"time:{value.timestamp},{value.ADC_data}")
case "SI_DECODER":
print(f"\033[Ftime:{value.timestamp}, SI:{value.log_message}",end="\r")
case "PR_DECODER":
print(f"PRD:{value.text_respons}")
case _:
print(f"Error {value.name}")
print("stopping")