Files
Python_Decoding/py_void_deocde.py

203 lines
7.5 KiB
Python

import os
from typing import BinaryIO, Tuple
import re
START_MARKER = 0xAF
END_MARKER = 0xAF # is the same and thus no end marker but start marker can act as end of previos message
END_ADC_FILE =""
def list_files_with_suffix(suffix=".bin") -> list[str]:
# Get the directory where the script is located
script_dir = os.path.dirname(os.path.abspath(__file__))
# List all files in that directory that end with the given suffix
matching_files = [f for f in os.listdir(script_dir)
if os.path.isfile(os.path.join(script_dir, f)) and f.endswith(suffix)]
return matching_files
def list_files_with_phrase(phrase :str) -> list[str]:
script_dir = os.path.dirname(os.path.abspath(__file__))
matching_files = [f for f in os.listdir(script_dir)
if os.path.isfile(os.path.join(script_dir, f)) and f.find(phrase) != -1]
def scan_to_end_marker(f: BinaryIO, start_index: int, end_marker: int = END_MARKER) -> tuple[int]:
"""
Utility: starting from `start_index`, read forward until `end_marker` is found.
Returns the number of bytes consumed (including the end marker) or -1 on EOF/error.
Leaves the file position at the byte *after* the end marker if successful.
"""
f.seek(start_index)
bytes_read = 0
while True:
b = f.read(1)
if not b:
# EOF before finding end marker
return tuple(-1,)
bytes_read += 1
if b[0] == end_marker:
return (end_marker,)
def decoder_1_ADC(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]:
f.seek(start_index)
channels = int.from_bytes(f.read(1),byteorder="big",signed=False)
lst = []
for i in range(channels):
lst.append(int.from_bytes(f.read(3),byteorder="big",signed=False))
timestamp = int.from_bytes(f.read(4),byteorder="big",signed=False)
end_marker = f.read(1)
if not end_marker:
end_marker = END_MARKER
else:
end_marker = end_marker[0]
if(end_marker != END_MARKER):
print(f"start_index{start_index} end_marker:{end_marker.hex()}")
return (-1)
with open(f"{filename}_ADC_data.csv", "a") as file:
file.write(f"{timestamp}")
for number in lst:
file.write(f",{number}")
file.write("\n")
return (3*channels+5,(timestamp,lst))
LOG_MESSAGE_ENUM_LIST = []
with open("SI_LOG_MESSAGES.txt") as file:
for message in file:
LOG_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message))
def decoder_SI(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]:
f.seek(start_index)
LogNumber = int.from_bytes(bytes=f.read(2),byteorder="big",signed=False)
TimeStamp = int.from_bytes(bytes=f.read(4),byteorder="big",signed=False)
try:
LogName = LOG_MESSAGE_ENUM_LIST[LogNumber]
except:
LogName = f"ERROR UNKOWN: {LogNumber}"
with open(f"{filename}_SI.csv", "a") as file:
file.write(f"{TimeStamp},{LogName}\n")
return (6,(TimeStamp,LogNumber))
COMMAND_MESSAGE_ENUM_LIST = []
with open("SI_COMMAND_MESSAGES.txt") as file:
for message in file:
COMMAND_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message))
def decoder_SI_command(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]:
f.seek(start_index)
LogNumber = int.from_bytes(bytes=f.read(2),byteorder="big",signed=False)
TimeStamp = int.from_bytes(bytes=f.read(4),byteorder="big",signed=False)
try:
LogName = COMMAND_MESSAGE_ENUM_LIST[LogNumber]
except:
LogName = f"ERROR UNKOWN: {LogNumber}"
with open(f"{filename}_SI.csv", "a") as file:
file.write(f"{TimeStamp},{LogName}\n")
return (6,(TimeStamp,LogNumber))
def decode_unknown(start_index: int, f: BinaryIO, filename: str) -> int:
"""
Fallback decoder for unknown indicator values.
Strategy: consume until END_MARKER. You can also choose to return -1 instead.
"""
return scan_to_end_marker(f, start_index, END_MARKER)
# -------- Dispatcher that scans the file and calls the decoders -------- #
def process_frames_in_file(filename: str) -> Tuple[int, int]:
"""
Scans `filename` for frames with START_MARKER (0xAF), reads the indicator byte,
dispatches to the appropriate decoder using structural pattern matching, and advances.
Decoder contract:
- Signature: decoder(start_index: int, f: BinaryIO, filename: str) -> int
- `start_index` is the index of the first byte AFTER the indicator byte.
- Return the number of bytes consumed starting at `start_index` (typically includes the END_MARKER),
or -1 on failure.
Returns:
(ok_count, fail_count): the number of frames successfully decoded and the number that failed.
"""
ok = 0
fail = 0
try:
with open(f"{filename}.bin", "rb") as f:
# Stream through the file byte-by-byte to find START_MARKER
while True:
b = f.read(1)
if not b:
break # EOF
if b[0] != START_MARKER:
continue # keep scanning
# Read the indicator byte immediately after the start marker
indicator_raw = f.read(1)
if not indicator_raw:
# EOF right after a start marker; incomplete frame
fail += 1
break
indicator = indicator_raw[0]
start_index = f.tell() # first byte after the indicator
# ---- Structural Pattern Matching to choose decoder ---- #
match indicator:
case 0x01:
decoder = decoder_1_ADC
case 0x02:
decoder = decoder_SI
case 0x03:
decoder = decoder_SI_command
case _:
print(f"Could not recognize indicator:{indicator} at:{start_index-1}")
decoder = decode_unknown
# ------------------------------------------------------ #
# Call the decoder with the agreed arguments
bytes_used = decoder(start_index, f, filename)[0]
if bytes_used == -1:
# Failed to decode; resynchronize by advancing one byte past start_index
fail += 1
f.seek(start_index) # go to the first payload byte
_ = f.read(1) # advance 1 byte and continue scanning
else:
ok += 1
# Advance to the next position right after the bytes we consumed
next_pos = start_index + bytes_used
f.seek(next_pos)
return ok, fail
except OSError as e:
# File can't be opened/read
raise RuntimeError(f"Could not process file '{filename}': {e}") from e
if __name__ == "__main__":
files = list_files_with_suffix()
files_csv = list_files_with_suffix(".csv")
for i in range(len(files_csv)):
for x in range(len(files)):
if files[x].split(".")[0] in files_csv[i]:
os.remove(files_csv[i])
print(f"Removing file: {files_csv[i]}")
for file in files:
print(process_frames_in_file(file.split(".")[0]))
print(files)