import os from typing import BinaryIO, Tuple import re START_MARKER = 0xAF END_MARKER = 0xAF # is the same and thus no end marker but start marker can act as end of previos message END_ADC_FILE ="" def list_files_with_suffix(suffix=".bin") -> list[str]: # Get the directory where the script is located script_dir = os.path.dirname(os.path.abspath(__file__)) # List all files in that directory that end with the given suffix matching_files = [f for f in os.listdir(script_dir) if os.path.isfile(os.path.join(script_dir, f)) and f.endswith(suffix)] return matching_files def list_files_with_phrase(phrase :str) -> list[str]: script_dir = os.path.dirname(os.path.abspath(__file__)) matching_files = [f for f in os.listdir(script_dir) if os.path.isfile(os.path.join(script_dir, f)) and f.find(phrase) != -1] def scan_to_end_marker(f: BinaryIO, start_index: int, end_marker: int = END_MARKER) -> tuple[int]: """ Utility: starting from `start_index`, read forward until `end_marker` is found. Returns the number of bytes consumed (including the end marker) or -1 on EOF/error. Leaves the file position at the byte *after* the end marker if successful. """ f.seek(start_index) bytes_read = 0 while True: b = f.read(1) if not b: # EOF before finding end marker return tuple(-1,) bytes_read += 1 if b[0] == end_marker: return (end_marker,) def decoder_1_ADC(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]: f.seek(start_index) channels = int.from_bytes(f.read(1),byteorder="big",signed=False) lst = [] for i in range(channels): lst.append(int.from_bytes(f.read(3),byteorder="big",signed=False)) timestamp = int.from_bytes(f.read(4),byteorder="big",signed=False) end_marker = f.read(1) if not end_marker: end_marker = END_MARKER else: end_marker = end_marker[0] if(end_marker != END_MARKER): print(f"start_index{start_index} end_marker:{end_marker.hex()}") return (-1) with open(f"{filename}_ADC_data.csv", "a") as file: file.write(f"{timestamp}") for number in lst: file.write(f",{number}") file.write("\n") return (3*channels+5,(timestamp,lst)) LOG_MESSAGE_ENUM_LIST = [] with open("SI_LOG_MESSAGES.txt") as file: for message in file: LOG_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message)) def decoder_SI(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]: f.seek(start_index) LogNumber = int.from_bytes(bytes=f.read(2),byteorder="big",signed=False) TimeStamp = int.from_bytes(bytes=f.read(4),byteorder="big",signed=False) try: LogName = LOG_MESSAGE_ENUM_LIST[LogNumber] except: LogName = f"ERROR UNKOWN: {LogNumber}" with open(f"{filename}_SI.csv", "a") as file: file.write(f"{TimeStamp},{LogName}\n") return (6,(TimeStamp,LogNumber)) COMMAND_MESSAGE_ENUM_LIST = [] with open("SI_COMMAND_MESSAGES.txt") as file: for message in file: COMMAND_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message)) def decoder_SI_command(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]: f.seek(start_index) LogNumber = int.from_bytes(bytes=f.read(2),byteorder="big",signed=False) TimeStamp = int.from_bytes(bytes=f.read(4),byteorder="big",signed=False) try: LogName = COMMAND_MESSAGE_ENUM_LIST[LogNumber] except: LogName = f"ERROR UNKOWN: {LogNumber}" with open(f"{filename}_SI.csv", "a") as file: file.write(f"{TimeStamp},{LogName}\n") return (6,(TimeStamp,LogNumber)) def decode_unknown(start_index: int, f: BinaryIO, filename: str) -> int: """ Fallback decoder for unknown indicator values. Strategy: consume until END_MARKER. You can also choose to return -1 instead. """ return scan_to_end_marker(f, start_index, END_MARKER) # -------- Dispatcher that scans the file and calls the decoders -------- # def process_frames_in_file(filename: str) -> Tuple[int, int]: """ Scans `filename` for frames with START_MARKER (0xAF), reads the indicator byte, dispatches to the appropriate decoder using structural pattern matching, and advances. Decoder contract: - Signature: decoder(start_index: int, f: BinaryIO, filename: str) -> int - `start_index` is the index of the first byte AFTER the indicator byte. - Return the number of bytes consumed starting at `start_index` (typically includes the END_MARKER), or -1 on failure. Returns: (ok_count, fail_count): the number of frames successfully decoded and the number that failed. """ ok = 0 fail = 0 try: with open(f"{filename}.bin", "rb") as f: # Stream through the file byte-by-byte to find START_MARKER while True: b = f.read(1) if not b: break # EOF if b[0] != START_MARKER: continue # keep scanning # Read the indicator byte immediately after the start marker indicator_raw = f.read(1) if not indicator_raw: # EOF right after a start marker; incomplete frame fail += 1 break indicator = indicator_raw[0] start_index = f.tell() # first byte after the indicator # ---- Structural Pattern Matching to choose decoder ---- # match indicator: case 0x01: decoder = decoder_1_ADC case 0x02: decoder = decoder_SI case 0x03: decoder = decoder_SI_command case _: print(f"Could not recognize indicator:{indicator} at:{start_index-1}") decoder = decode_unknown # ------------------------------------------------------ # # Call the decoder with the agreed arguments bytes_used = decoder(start_index, f, filename)[0] if bytes_used == -1: # Failed to decode; resynchronize by advancing one byte past start_index fail += 1 f.seek(start_index) # go to the first payload byte _ = f.read(1) # advance 1 byte and continue scanning else: ok += 1 # Advance to the next position right after the bytes we consumed next_pos = start_index + bytes_used f.seek(next_pos) return ok, fail except OSError as e: # File can't be opened/read raise RuntimeError(f"Could not process file '{filename}': {e}") from e if __name__ == "__main__": files = list_files_with_suffix() files_csv = list_files_with_suffix(".csv") for i in range(len(files_csv)): for x in range(len(files)): if files[x].split(".")[0] in files_csv[i]: os.remove(files_csv[i]) print(f"Removing file: {files_csv[i]}") for file in files: print(process_frames_in_file(file.split(".")[0])) print(files)