import serial from dataclasses import dataclass import threading from dataclasses import dataclass, field from typing import List from datetime import datetime import queue import re PORT = "COM9" SerialPort = serial.Serial(port=PORT,baudrate=250000) input_queue = queue.SimpleQueue() output_queue = queue.SimpleQueue() @dataclass class ReturnDecoder: name: str text_respons: str = "" ADC_data: List[int] = field(default_factory=lambda: []) timestamp: int = 0 log_message: str = "" bytes_read: int = 0 #length is 3*16+4+1 def ADC_FULLRANK(buffer: bytes) -> ReturnDecoder: if len(buffer)<53 : return ReturnDecoder("EMPTY") return_var = ReturnDecoder("ADC_FULLRANK") for i in range(16): return_var.ADC_data.append(int.from_bytes(bytes=buffer[3*i:3+3*i],byteorder="big",signed=True)) return_var.timestamp = int.from_bytes(bytes=buffer[48:52],byteorder="big",signed=False) if buffer[52] != 0xFA: return_var.name = f"FAULT_DECODER_END" return_var.bytes_read = 1 print(buffer.hex(sep=" ")) return_var.bytes_read = 55 return return_var LOG_MESSAGE_ENUM_LIST = [] with open("SI_LOG_MESSAGES.txt") as file: for message in file: LOG_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message)) def SI_DECODER(buffer: bytes) -> ReturnDecoder: if(len(buffer)<7): return ReturnDecoder("EMPTY") return_var = ReturnDecoder("SI_DECODER") LogNumber = int.from_bytes(bytes=buffer[0:2],byteorder="big",signed=False) try: return_var.log_message = LOG_MESSAGE_ENUM_LIST[LogNumber] except: return_var.name = "WRONG_SI_NUMBER" return_var.log_message = LogNumber return_var.bytes_read = 1 return return_var return_var.timestamp = int.from_bytes(bytes=buffer[2:6],byteorder="big",signed=False) if buffer[6] != 0xFA: return_var.bytes_read = 1 return_var.name = f"FAULT_SI_DECODER_END" print(buffer.hex(sep=" ")) return_var.bytes_read = 7 return return_var AF_DECODERS = { 0x01: ADC_FULLRANK, 0x02: SI_DECODER } def AFDecoder(buffer: bytes) -> ReturnDecoder: if len(buffer) < 2: return ReturnDecoder("EMPTY") ret = ReturnDecoder("FAULT_DECODER") ret.bytes_read = 1 if buffer[1] in AF_DECODERS: try: return AF_DECODERS[buffer[1]](buffer[2:]) except: return ret else: print("unkown message") return ret def PRDecoder(buffer: bytes) -> ReturnDecoder: if buffer.find(b"\n") < 0: return ReturnDecoder("EMPTY") temp_str = buffer.decode("ascii",errors="replace") ret = ReturnDecoder("PR_DECODER") ret.text_respons = temp_str.split(sep='\n',maxsplit=2)[0] ret.bytes_read = len(ret.text_respons) return ret def input_loop(): while True: input_str = input()+'\n\r' if input_str.find("exit") != -1: return input_queue.put(input_str) def output_loop(): ret_DEC : ReturnDecoder = ReturnDecoder("EMPTY") buffer : bytes = bytes(b"") SerialPort.timeout = 0.1 input_str: str = "" ret_DEC.name = "EMPTY" ret_DEC.bytes_read = 0 while True: try: input_str = input_queue.get_nowait() print(f"SD: sending {input_str}") SerialPort.write(input_str.encode("ascii")) except: pass buffer = buffer + SerialPort.read_all() AF_index = buffer.find(0xAF) PR_index = buffer.find(b"PR") if AF_index >= 0 and PR_index >= 0: if AF_index < PR_index: #if(AF_index): print(buffer[:AF_index].decode("ascii","replace")) buffer = buffer[AF_index:] ret_DEC = AFDecoder(buffer) else: #if(PR_index): print(buffer[:PR_index].decode("ascii","replace")) buffer = buffer[PR_index:] ret_DEC = PRDecoder(buffer) elif AF_index >= 0: #if(AF_index): print(buffer[:AF_index].decode("ascii","replace")) buffer = buffer[AF_index:] ret_DEC = AFDecoder(buffer) elif PR_index >= 0: #if(PR_index): print(buffer[:PR_index].decode("ascii","replace")) buffer = buffer[PR_index:] ret_DEC = PRDecoder(buffer) if ret_DEC.bytes_read != 0: buffer = buffer[ret_DEC.bytes_read:] if ret_DEC.name != "EMPTY": output_queue.put(ret_DEC) ret_DEC = ReturnDecoder("EMPTY") def GetReturn(timeout: float) -> ReturnDecoder | None: try: return output_queue.get(timeout=timeout) except queue.Empty: return None def SendCommand(input_str: str): input_queue.put(input_str) def StartInputOutput() -> tuple[threading.Thread, threading.Thread]: t1 = threading.Thread(target=output_loop,daemon=True) t2 = threading.Thread(target=input_loop, daemon=True) t1.start() t2.start() return (t1,t2) def StartOutput() -> threading.Thread: t1 = threading.Thread(target=output_loop,daemon=True) t1.start() return t1 if __name__ == "__main__": t1 = threading.Thread(target=output_loop,daemon=True) t2 = threading.Thread(target=input_loop) t1.start() t2.start() while True: value = GetReturn(0.5) if t2.is_alive() == False: break if value == None: continue match value.name: case "ADC_FULLRANK": print(f"time:{value.timestamp},{value.ADC_data}") case "SI_DECODER": print(f"\033[Ftime:{value.timestamp}, SI:{value.log_message}",end="\r") case "PR_DECODER": print(f"PRD:{value.text_respons}") case _: print(f"Error {value.name}") print("stopping")