Use the py_void_decode.py to decode files from the FC
This commit is contained in:
203
py_void_deocde.py
Normal file
203
py_void_deocde.py
Normal file
@@ -0,0 +1,203 @@
|
|||||||
|
import os
|
||||||
|
from typing import BinaryIO, Tuple
|
||||||
|
import re
|
||||||
|
|
||||||
|
START_MARKER = 0xAF
|
||||||
|
END_MARKER = 0xAF # is the same and thus no end marker but start marker can act as end of previos message
|
||||||
|
|
||||||
|
END_ADC_FILE =""
|
||||||
|
|
||||||
|
def list_files_with_suffix(suffix=".bin") -> list[str]:
|
||||||
|
# Get the directory where the script is located
|
||||||
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
|
||||||
|
# List all files in that directory that end with the given suffix
|
||||||
|
matching_files = [f for f in os.listdir(script_dir)
|
||||||
|
if os.path.isfile(os.path.join(script_dir, f)) and f.endswith(suffix)]
|
||||||
|
return matching_files
|
||||||
|
|
||||||
|
def list_files_with_phrase(phrase :str) -> list[str]:
|
||||||
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
matching_files = [f for f in os.listdir(script_dir)
|
||||||
|
if os.path.isfile(os.path.join(script_dir, f)) and f.find(phrase) != -1]
|
||||||
|
|
||||||
|
def scan_to_end_marker(f: BinaryIO, start_index: int, end_marker: int = END_MARKER) -> tuple[int]:
|
||||||
|
"""
|
||||||
|
Utility: starting from `start_index`, read forward until `end_marker` is found.
|
||||||
|
Returns the number of bytes consumed (including the end marker) or -1 on EOF/error.
|
||||||
|
Leaves the file position at the byte *after* the end marker if successful.
|
||||||
|
"""
|
||||||
|
f.seek(start_index)
|
||||||
|
bytes_read = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
b = f.read(1)
|
||||||
|
if not b:
|
||||||
|
# EOF before finding end marker
|
||||||
|
return tuple(-1,)
|
||||||
|
bytes_read += 1
|
||||||
|
if b[0] == end_marker:
|
||||||
|
return (end_marker,)
|
||||||
|
|
||||||
|
|
||||||
|
def decoder_1_ADC(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]:
|
||||||
|
f.seek(start_index)
|
||||||
|
channels = int.from_bytes(f.read(1),byteorder="big",signed=False)
|
||||||
|
|
||||||
|
lst = []
|
||||||
|
|
||||||
|
for i in range(channels):
|
||||||
|
lst.append(int.from_bytes(f.read(3),byteorder="big",signed=False))
|
||||||
|
|
||||||
|
timestamp = int.from_bytes(f.read(4),byteorder="big",signed=False)
|
||||||
|
end_marker = f.read(1)
|
||||||
|
if not end_marker:
|
||||||
|
end_marker = END_MARKER
|
||||||
|
else:
|
||||||
|
end_marker = end_marker[0]
|
||||||
|
if(end_marker != END_MARKER):
|
||||||
|
print(f"start_index{start_index} end_marker:{end_marker.hex()}")
|
||||||
|
return (-1)
|
||||||
|
|
||||||
|
with open(f"{filename}_ADC_data.csv", "a") as file:
|
||||||
|
file.write(f"{timestamp}")
|
||||||
|
for number in lst:
|
||||||
|
file.write(f",{number}")
|
||||||
|
file.write("\n")
|
||||||
|
|
||||||
|
return (3*channels+5,(timestamp,lst))
|
||||||
|
|
||||||
|
LOG_MESSAGE_ENUM_LIST = []
|
||||||
|
with open("SI_LOG_MESSAGES.txt") as file:
|
||||||
|
for message in file:
|
||||||
|
LOG_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message))
|
||||||
|
|
||||||
|
def decoder_SI(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]:
|
||||||
|
f.seek(start_index)
|
||||||
|
LogNumber = int.from_bytes(bytes=f.read(2),byteorder="big",signed=False)
|
||||||
|
TimeStamp = int.from_bytes(bytes=f.read(4),byteorder="big",signed=False)
|
||||||
|
try:
|
||||||
|
LogName = LOG_MESSAGE_ENUM_LIST[LogNumber]
|
||||||
|
except:
|
||||||
|
LogName = f"ERROR UNKOWN: {LogNumber}"
|
||||||
|
with open(f"{filename}_SI.csv", "a") as file:
|
||||||
|
file.write(f"{TimeStamp},{LogName}\n")
|
||||||
|
|
||||||
|
return (6,(TimeStamp,LogNumber))
|
||||||
|
|
||||||
|
COMMAND_MESSAGE_ENUM_LIST = []
|
||||||
|
with open("SI_COMMAND_MESSAGES.txt") as file:
|
||||||
|
for message in file:
|
||||||
|
COMMAND_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message))
|
||||||
|
|
||||||
|
def decoder_SI_command(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]:
|
||||||
|
f.seek(start_index)
|
||||||
|
LogNumber = int.from_bytes(bytes=f.read(2),byteorder="big",signed=False)
|
||||||
|
TimeStamp = int.from_bytes(bytes=f.read(4),byteorder="big",signed=False)
|
||||||
|
try:
|
||||||
|
LogName = COMMAND_MESSAGE_ENUM_LIST[LogNumber]
|
||||||
|
except:
|
||||||
|
LogName = f"ERROR UNKOWN: {LogNumber}"
|
||||||
|
with open(f"{filename}_SI.csv", "a") as file:
|
||||||
|
file.write(f"{TimeStamp},{LogName}\n")
|
||||||
|
return (6,(TimeStamp,LogNumber))
|
||||||
|
|
||||||
|
def decode_unknown(start_index: int, f: BinaryIO, filename: str) -> int:
|
||||||
|
"""
|
||||||
|
Fallback decoder for unknown indicator values.
|
||||||
|
Strategy: consume until END_MARKER. You can also choose to return -1 instead.
|
||||||
|
"""
|
||||||
|
return scan_to_end_marker(f, start_index, END_MARKER)
|
||||||
|
|
||||||
|
|
||||||
|
# -------- Dispatcher that scans the file and calls the decoders -------- #
|
||||||
|
|
||||||
|
def process_frames_in_file(filename: str) -> Tuple[int, int]:
|
||||||
|
"""
|
||||||
|
Scans `filename` for frames with START_MARKER (0xAF), reads the indicator byte,
|
||||||
|
dispatches to the appropriate decoder using structural pattern matching, and advances.
|
||||||
|
|
||||||
|
Decoder contract:
|
||||||
|
- Signature: decoder(start_index: int, f: BinaryIO, filename: str) -> int
|
||||||
|
- `start_index` is the index of the first byte AFTER the indicator byte.
|
||||||
|
- Return the number of bytes consumed starting at `start_index` (typically includes the END_MARKER),
|
||||||
|
or -1 on failure.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(ok_count, fail_count): the number of frames successfully decoded and the number that failed.
|
||||||
|
"""
|
||||||
|
ok = 0
|
||||||
|
fail = 0
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(f"{filename}.bin", "rb") as f:
|
||||||
|
# Stream through the file byte-by-byte to find START_MARKER
|
||||||
|
while True:
|
||||||
|
b = f.read(1)
|
||||||
|
if not b:
|
||||||
|
break # EOF
|
||||||
|
|
||||||
|
if b[0] != START_MARKER:
|
||||||
|
continue # keep scanning
|
||||||
|
|
||||||
|
# Read the indicator byte immediately after the start marker
|
||||||
|
indicator_raw = f.read(1)
|
||||||
|
if not indicator_raw:
|
||||||
|
# EOF right after a start marker; incomplete frame
|
||||||
|
fail += 1
|
||||||
|
break
|
||||||
|
|
||||||
|
indicator = indicator_raw[0]
|
||||||
|
start_index = f.tell() # first byte after the indicator
|
||||||
|
|
||||||
|
# ---- Structural Pattern Matching to choose decoder ---- #
|
||||||
|
match indicator:
|
||||||
|
case 0x01:
|
||||||
|
decoder = decoder_1_ADC
|
||||||
|
case 0x02:
|
||||||
|
decoder = decoder_SI
|
||||||
|
case 0x03:
|
||||||
|
decoder = decoder_SI_command
|
||||||
|
case _:
|
||||||
|
print(f"Could not recognize indicator:{indicator} at:{start_index-1}")
|
||||||
|
decoder = decode_unknown
|
||||||
|
# ------------------------------------------------------ #
|
||||||
|
|
||||||
|
# Call the decoder with the agreed arguments
|
||||||
|
bytes_used = decoder(start_index, f, filename)[0]
|
||||||
|
|
||||||
|
if bytes_used == -1:
|
||||||
|
# Failed to decode; resynchronize by advancing one byte past start_index
|
||||||
|
fail += 1
|
||||||
|
f.seek(start_index) # go to the first payload byte
|
||||||
|
_ = f.read(1) # advance 1 byte and continue scanning
|
||||||
|
else:
|
||||||
|
ok += 1
|
||||||
|
# Advance to the next position right after the bytes we consumed
|
||||||
|
next_pos = start_index + bytes_used
|
||||||
|
f.seek(next_pos)
|
||||||
|
|
||||||
|
return ok, fail
|
||||||
|
|
||||||
|
except OSError as e:
|
||||||
|
# File can't be opened/read
|
||||||
|
raise RuntimeError(f"Could not process file '{filename}': {e}") from e
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
files = list_files_with_suffix()
|
||||||
|
files_csv = list_files_with_suffix(".csv")
|
||||||
|
|
||||||
|
for i in range(len(files_csv)):
|
||||||
|
for x in range(len(files)):
|
||||||
|
if files[x].split(".")[0] in files_csv[i]:
|
||||||
|
os.remove(files_csv[i])
|
||||||
|
print(f"Removing file: {files_csv[i]}")
|
||||||
|
|
||||||
|
for file in files:
|
||||||
|
print(process_frames_in_file(file.split(".")[0]))
|
||||||
|
|
||||||
|
print(files)
|
||||||
Reference in New Issue
Block a user