From 552525214b0e54f466b53ec5e63371281967d1ae Mon Sep 17 00:00:00 2001 From: simonvanvalkengoed Date: Tue, 2 Dec 2025 14:52:30 +0100 Subject: [PATCH] Use the py_void_decode.py to decode files from the FC --- py_void_deocde.py | 203 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 py_void_deocde.py diff --git a/py_void_deocde.py b/py_void_deocde.py new file mode 100644 index 0000000..88e088c --- /dev/null +++ b/py_void_deocde.py @@ -0,0 +1,203 @@ +import os +from typing import BinaryIO, Tuple +import re + +START_MARKER = 0xAF +END_MARKER = 0xAF # is the same and thus no end marker but start marker can act as end of previos message + +END_ADC_FILE ="" + +def list_files_with_suffix(suffix=".bin") -> list[str]: + # Get the directory where the script is located + script_dir = os.path.dirname(os.path.abspath(__file__)) + + # List all files in that directory that end with the given suffix + matching_files = [f for f in os.listdir(script_dir) + if os.path.isfile(os.path.join(script_dir, f)) and f.endswith(suffix)] + return matching_files + +def list_files_with_phrase(phrase :str) -> list[str]: + script_dir = os.path.dirname(os.path.abspath(__file__)) + matching_files = [f for f in os.listdir(script_dir) + if os.path.isfile(os.path.join(script_dir, f)) and f.find(phrase) != -1] + +def scan_to_end_marker(f: BinaryIO, start_index: int, end_marker: int = END_MARKER) -> tuple[int]: + """ + Utility: starting from `start_index`, read forward until `end_marker` is found. + Returns the number of bytes consumed (including the end marker) or -1 on EOF/error. + Leaves the file position at the byte *after* the end marker if successful. + """ + f.seek(start_index) + bytes_read = 0 + + while True: + b = f.read(1) + if not b: + # EOF before finding end marker + return tuple(-1,) + bytes_read += 1 + if b[0] == end_marker: + return (end_marker,) + + +def decoder_1_ADC(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]: + f.seek(start_index) + channels = int.from_bytes(f.read(1),byteorder="big",signed=False) + + lst = [] + + for i in range(channels): + lst.append(int.from_bytes(f.read(3),byteorder="big",signed=False)) + + timestamp = int.from_bytes(f.read(4),byteorder="big",signed=False) + end_marker = f.read(1) + if not end_marker: + end_marker = END_MARKER + else: + end_marker = end_marker[0] + if(end_marker != END_MARKER): + print(f"start_index{start_index} end_marker:{end_marker.hex()}") + return (-1) + + with open(f"{filename}_ADC_data.csv", "a") as file: + file.write(f"{timestamp}") + for number in lst: + file.write(f",{number}") + file.write("\n") + + return (3*channels+5,(timestamp,lst)) + +LOG_MESSAGE_ENUM_LIST = [] +with open("SI_LOG_MESSAGES.txt") as file: + for message in file: + LOG_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message)) + +def decoder_SI(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]: + f.seek(start_index) + LogNumber = int.from_bytes(bytes=f.read(2),byteorder="big",signed=False) + TimeStamp = int.from_bytes(bytes=f.read(4),byteorder="big",signed=False) + try: + LogName = LOG_MESSAGE_ENUM_LIST[LogNumber] + except: + LogName = f"ERROR UNKOWN: {LogNumber}" + with open(f"{filename}_SI.csv", "a") as file: + file.write(f"{TimeStamp},{LogName}\n") + + return (6,(TimeStamp,LogNumber)) + +COMMAND_MESSAGE_ENUM_LIST = [] +with open("SI_COMMAND_MESSAGES.txt") as file: + for message in file: + COMMAND_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message)) + +def decoder_SI_command(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]: + f.seek(start_index) + LogNumber = int.from_bytes(bytes=f.read(2),byteorder="big",signed=False) + TimeStamp = int.from_bytes(bytes=f.read(4),byteorder="big",signed=False) + try: + LogName = COMMAND_MESSAGE_ENUM_LIST[LogNumber] + except: + LogName = f"ERROR UNKOWN: {LogNumber}" + with open(f"{filename}_SI.csv", "a") as file: + file.write(f"{TimeStamp},{LogName}\n") + return (6,(TimeStamp,LogNumber)) + +def decode_unknown(start_index: int, f: BinaryIO, filename: str) -> int: + """ + Fallback decoder for unknown indicator values. + Strategy: consume until END_MARKER. You can also choose to return -1 instead. + """ + return scan_to_end_marker(f, start_index, END_MARKER) + + +# -------- Dispatcher that scans the file and calls the decoders -------- # + +def process_frames_in_file(filename: str) -> Tuple[int, int]: + """ + Scans `filename` for frames with START_MARKER (0xAF), reads the indicator byte, + dispatches to the appropriate decoder using structural pattern matching, and advances. + + Decoder contract: + - Signature: decoder(start_index: int, f: BinaryIO, filename: str) -> int + - `start_index` is the index of the first byte AFTER the indicator byte. + - Return the number of bytes consumed starting at `start_index` (typically includes the END_MARKER), + or -1 on failure. + + Returns: + (ok_count, fail_count): the number of frames successfully decoded and the number that failed. + """ + ok = 0 + fail = 0 + + + + try: + with open(f"{filename}.bin", "rb") as f: + # Stream through the file byte-by-byte to find START_MARKER + while True: + b = f.read(1) + if not b: + break # EOF + + if b[0] != START_MARKER: + continue # keep scanning + + # Read the indicator byte immediately after the start marker + indicator_raw = f.read(1) + if not indicator_raw: + # EOF right after a start marker; incomplete frame + fail += 1 + break + + indicator = indicator_raw[0] + start_index = f.tell() # first byte after the indicator + + # ---- Structural Pattern Matching to choose decoder ---- # + match indicator: + case 0x01: + decoder = decoder_1_ADC + case 0x02: + decoder = decoder_SI + case 0x03: + decoder = decoder_SI_command + case _: + print(f"Could not recognize indicator:{indicator} at:{start_index-1}") + decoder = decode_unknown + # ------------------------------------------------------ # + + # Call the decoder with the agreed arguments + bytes_used = decoder(start_index, f, filename)[0] + + if bytes_used == -1: + # Failed to decode; resynchronize by advancing one byte past start_index + fail += 1 + f.seek(start_index) # go to the first payload byte + _ = f.read(1) # advance 1 byte and continue scanning + else: + ok += 1 + # Advance to the next position right after the bytes we consumed + next_pos = start_index + bytes_used + f.seek(next_pos) + + return ok, fail + + except OSError as e: + # File can't be opened/read + raise RuntimeError(f"Could not process file '{filename}': {e}") from e + + + +if __name__ == "__main__": + files = list_files_with_suffix() + files_csv = list_files_with_suffix(".csv") + + for i in range(len(files_csv)): + for x in range(len(files)): + if files[x].split(".")[0] in files_csv[i]: + os.remove(files_csv[i]) + print(f"Removing file: {files_csv[i]}") + + for file in files: + print(process_frames_in_file(file.split(".")[0])) + + print(files) \ No newline at end of file