Compare commits
4 Commits
0c7f46f610
...
4a0f3aaab6
| Author | SHA1 | Date | |
|---|---|---|---|
| 4a0f3aaab6 | |||
| 0bc82518d7 | |||
| 552525214b | |||
| 32e8ab0b24 |
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
/.venv
|
||||
/SaveData
|
||||
/__pycache__
|
||||
0
COMMANDS.txt
Normal file
0
COMMANDS.txt
Normal file
19
SI_COMMAND_MESSAGES.txt
Normal file
19
SI_COMMAND_MESSAGES.txt
Normal file
@@ -0,0 +1,19 @@
|
||||
STOP_STREAM
|
||||
START_STREAM
|
||||
RESET
|
||||
WIPE_STM
|
||||
UPTIME
|
||||
PING
|
||||
GET_STATE
|
||||
ARM
|
||||
DISARM
|
||||
STATIC_FIRE1
|
||||
STATIC_FIRE2
|
||||
STATIC_FIRE3
|
||||
ABORT
|
||||
SERVO_SWEEP
|
||||
SERVO_SET
|
||||
GET_READINGS
|
||||
SET_ST_SPEED
|
||||
START_FLASH
|
||||
START_USB
|
||||
27
SI_LOG_MESSAGES.txt
Normal file
27
SI_LOG_MESSAGES.txt
Normal file
@@ -0,0 +1,27 @@
|
||||
EMPTY_LOG = 0,
|
||||
LOGGING_DATA,
|
||||
FLASH_ERROR,
|
||||
FLASH_START_ERROR,
|
||||
FLASH_FULL,
|
||||
FLASH_LOG_FULL,
|
||||
ENTER_STATIC_FIRE,
|
||||
EXIT_STATIC_FIRE,
|
||||
ENTER_ARM,
|
||||
UPDATE_ARM,
|
||||
EXIT_ARM,
|
||||
LOX_OPEN,
|
||||
LOX_OPEN_FULL,
|
||||
ETHANOL_OPEN,
|
||||
ETHANOL_OPEN_FULL,
|
||||
NITROGEN_OPEN,
|
||||
NITROGEN_OPEN_FULL,
|
||||
LOX_CLOSING,
|
||||
LOX_FULL_CLOSED,
|
||||
ETHANOL_CLOSING,
|
||||
ETHANOL_FULL_CLOSED,
|
||||
NITROGEN_CLOSING,
|
||||
NITROGEN_FULL_CLOSED,
|
||||
ETHANOL_DRAIN_OPEN,
|
||||
LOX_DRAIN_OPEN,
|
||||
ETHANOL_DRAIN_CLOSED,
|
||||
LOX_DRAIN_CLOSED
|
||||
185
Serial_decoder.py
Normal file
185
Serial_decoder.py
Normal file
@@ -0,0 +1,185 @@
|
||||
import serial
|
||||
from dataclasses import dataclass
|
||||
import threading
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List
|
||||
from datetime import datetime
|
||||
import queue
|
||||
import re
|
||||
|
||||
PORT = "COM9"
|
||||
|
||||
SerialPort = serial.Serial(port=PORT,baudrate=250000)
|
||||
input_queue = queue.SimpleQueue()
|
||||
output_queue = queue.SimpleQueue()
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReturnDecoder:
|
||||
name: str
|
||||
text_respons: str = ""
|
||||
ADC_data: List[int] = field(default_factory=lambda: [])
|
||||
timestamp: int = 0
|
||||
log_message: str = ""
|
||||
bytes_read: int = 0
|
||||
|
||||
|
||||
|
||||
#length is 3*16+4+1
|
||||
def ADC_FULLRANK(buffer: bytes) -> ReturnDecoder:
|
||||
if len(buffer)<53 : return ReturnDecoder("EMPTY")
|
||||
return_var = ReturnDecoder("ADC_FULLRANK")
|
||||
for i in range(16):
|
||||
return_var.ADC_data.append(int.from_bytes(bytes=buffer[3*i:3+3*i],byteorder="big",signed=True))
|
||||
return_var.timestamp = int.from_bytes(bytes=buffer[48:52],byteorder="big",signed=False)
|
||||
if buffer[52] != 0xFA:
|
||||
return_var.name = f"FAULT_DECODER_END"
|
||||
return_var.bytes_read = 1
|
||||
print(buffer.hex(sep=" "))
|
||||
return_var.bytes_read = 55
|
||||
return return_var
|
||||
|
||||
LOG_MESSAGE_ENUM_LIST = []
|
||||
with open("SI_LOG_MESSAGES.txt") as file:
|
||||
for message in file:
|
||||
LOG_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message))
|
||||
|
||||
def SI_DECODER(buffer: bytes) -> ReturnDecoder:
|
||||
if(len(buffer)<7): return ReturnDecoder("EMPTY")
|
||||
return_var = ReturnDecoder("SI_DECODER")
|
||||
LogNumber = int.from_bytes(bytes=buffer[0:2],byteorder="big",signed=False)
|
||||
try:
|
||||
return_var.log_message = LOG_MESSAGE_ENUM_LIST[LogNumber]
|
||||
except:
|
||||
return_var.name = "WRONG_SI_NUMBER"
|
||||
return_var.log_message = LogNumber
|
||||
return_var.bytes_read = 1
|
||||
return return_var
|
||||
return_var.timestamp = int.from_bytes(bytes=buffer[2:6],byteorder="big",signed=False)
|
||||
if buffer[6] != 0xFA:
|
||||
return_var.bytes_read = 1
|
||||
return_var.name = f"FAULT_SI_DECODER_END"
|
||||
print(buffer.hex(sep=" "))
|
||||
return_var.bytes_read = 7
|
||||
return return_var
|
||||
|
||||
|
||||
AF_DECODERS = {
|
||||
0x01: ADC_FULLRANK,
|
||||
0x02: SI_DECODER
|
||||
}
|
||||
|
||||
def AFDecoder(buffer: bytes) -> ReturnDecoder:
|
||||
if len(buffer) < 2: return ReturnDecoder("EMPTY")
|
||||
ret = ReturnDecoder("FAULT_DECODER")
|
||||
ret.bytes_read = 1
|
||||
if buffer[1] in AF_DECODERS:
|
||||
try:
|
||||
return AF_DECODERS[buffer[1]](buffer[2:])
|
||||
except:
|
||||
return ret
|
||||
else:
|
||||
print("unkown message")
|
||||
return ret
|
||||
|
||||
def PRDecoder(buffer: bytes) -> ReturnDecoder:
|
||||
if buffer.find(b"\n") < 0:
|
||||
return ReturnDecoder("EMPTY")
|
||||
temp_str = buffer.decode("ascii",errors="replace")
|
||||
ret = ReturnDecoder("PR_DECODER")
|
||||
ret.text_respons = temp_str.split(sep='\n',maxsplit=2)[0]
|
||||
ret.bytes_read = len(ret.text_respons)
|
||||
return ret
|
||||
|
||||
def input_loop():
|
||||
while True:
|
||||
input_str = input()+'\n\r'
|
||||
if input_str.find("exit") != -1:
|
||||
return
|
||||
input_queue.put(input_str)
|
||||
|
||||
def output_loop():
|
||||
ret_DEC : ReturnDecoder = ReturnDecoder("EMPTY")
|
||||
buffer : bytes = bytes(b"")
|
||||
SerialPort.timeout = 0.1
|
||||
input_str: str = ""
|
||||
ret_DEC.name = "EMPTY"
|
||||
ret_DEC.bytes_read = 0
|
||||
while True:
|
||||
try:
|
||||
input_str = input_queue.get_nowait()
|
||||
print(f"SD: sending {input_str}")
|
||||
SerialPort.write(input_str.encode("ascii"))
|
||||
except:
|
||||
pass
|
||||
buffer = buffer + SerialPort.read_all()
|
||||
AF_index = buffer.find(0xAF)
|
||||
PR_index = buffer.find(b"PR")
|
||||
if AF_index >= 0 and PR_index >= 0:
|
||||
if AF_index < PR_index:
|
||||
#if(AF_index): print(buffer[:AF_index].decode("ascii","replace"))
|
||||
buffer = buffer[AF_index:]
|
||||
ret_DEC = AFDecoder(buffer)
|
||||
else:
|
||||
#if(PR_index): print(buffer[:PR_index].decode("ascii","replace"))
|
||||
buffer = buffer[PR_index:]
|
||||
ret_DEC = PRDecoder(buffer)
|
||||
elif AF_index >= 0:
|
||||
#if(AF_index): print(buffer[:AF_index].decode("ascii","replace"))
|
||||
buffer = buffer[AF_index:]
|
||||
ret_DEC = AFDecoder(buffer)
|
||||
elif PR_index >= 0:
|
||||
#if(PR_index): print(buffer[:PR_index].decode("ascii","replace"))
|
||||
buffer = buffer[PR_index:]
|
||||
ret_DEC = PRDecoder(buffer)
|
||||
|
||||
if ret_DEC.bytes_read != 0:
|
||||
buffer = buffer[ret_DEC.bytes_read:]
|
||||
if ret_DEC.name != "EMPTY":
|
||||
output_queue.put(ret_DEC)
|
||||
ret_DEC = ReturnDecoder("EMPTY")
|
||||
|
||||
|
||||
def GetReturn(timeout: float) -> ReturnDecoder | None:
|
||||
try:
|
||||
return output_queue.get(timeout=timeout)
|
||||
except queue.Empty:
|
||||
return None
|
||||
|
||||
def SendCommand(input_str: str):
|
||||
input_queue.put(input_str)
|
||||
|
||||
def StartInputOutput() -> tuple[threading.Thread, threading.Thread]:
|
||||
t1 = threading.Thread(target=output_loop,daemon=True)
|
||||
t2 = threading.Thread(target=input_loop, daemon=True)
|
||||
t1.start()
|
||||
t2.start()
|
||||
return (t1,t2)
|
||||
|
||||
def StartOutput() -> threading.Thread:
|
||||
t1 = threading.Thread(target=output_loop,daemon=True)
|
||||
t1.start()
|
||||
return t1
|
||||
|
||||
if __name__ == "__main__":
|
||||
t1 = threading.Thread(target=output_loop,daemon=True)
|
||||
t2 = threading.Thread(target=input_loop)
|
||||
t1.start()
|
||||
t2.start()
|
||||
while True:
|
||||
value = GetReturn(0.5)
|
||||
if t2.is_alive() == False:
|
||||
break
|
||||
if value == None:
|
||||
continue
|
||||
match value.name:
|
||||
case "ADC_FULLRANK":
|
||||
print(f"time:{value.timestamp},{value.ADC_data}")
|
||||
case "SI_DECODER":
|
||||
print(f"\033[Ftime:{value.timestamp}, SI:{value.log_message}",end="\r")
|
||||
case "PR_DECODER":
|
||||
print(f"PRD:{value.text_respons}")
|
||||
case _:
|
||||
print(f"Error {value.name}")
|
||||
print("stopping")
|
||||
|
||||
168
mainplot.py
Normal file
168
mainplot.py
Normal file
@@ -0,0 +1,168 @@
|
||||
|
||||
import Serial_decoder as SD
|
||||
import tkinter as tk
|
||||
from tkinter import ttk
|
||||
from datetime import datetime
|
||||
import time
|
||||
from collections import deque
|
||||
|
||||
# Matplotlib embedding in Tkinter
|
||||
from matplotlib.figure import Figure
|
||||
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
||||
|
||||
# === Serial threads (unchanged) ===
|
||||
tasks = SD.StartInputOutput() # t1: output_loop (daemon), t2: input_loop (daemon)
|
||||
FILENAME = f"SaveData/{datetime.today().strftime('%Y_%m_%d_%H_%M')}"
|
||||
|
||||
# === Plot history ===
|
||||
MAX_POINTS = 100 # rolling window length
|
||||
history_x = deque(maxlen=MAX_POINTS) # timestamps
|
||||
history_y = [deque(maxlen=MAX_POINTS) for _ in range(16)] # per-channel values
|
||||
|
||||
# === GUI setup ===
|
||||
root = tk.Tk()
|
||||
root.title("Live 4x4 Grid + Selectable Real-Time Plot")
|
||||
|
||||
# Left: 4x4 grid
|
||||
grid_frame = ttk.Frame(root)
|
||||
grid_frame.grid(row=0, column=0, padx=10, pady=10, sticky="n")
|
||||
|
||||
labels = []
|
||||
for row in range(4):
|
||||
for col in range(4):
|
||||
idx = row * 4 + col
|
||||
lbl = tk.Label(
|
||||
grid_frame, name=f"channel{idx}",
|
||||
text="0", width=10, height=3,
|
||||
borderwidth=2, relief="groove", font=("Arial", 14)
|
||||
)
|
||||
lbl.grid(row=row, column=col, padx=5, pady=5)
|
||||
labels.append(lbl)
|
||||
|
||||
# Last tick label (as you had)
|
||||
last_tick_lbl = tk.Label(
|
||||
grid_frame, text="Last Tick: -", width=20, height=3,
|
||||
borderwidth=2, relief="groove", font=("Arial", 14)
|
||||
)
|
||||
lbl = tk.Label(root, text="Last Tick:0", width=20, height=3, borderwidth=2, relief="groove", font=("Arial", 14))
|
||||
lbl.grid(row = 4, column=0, columnspan=2)
|
||||
labels.append(lbl)
|
||||
lbl = tk.Label(root, text="Delta Tick:0", width=20, height=3, borderwidth=2, relief="groove", font=("Arial", 14))
|
||||
lbl.grid(row=4, column=2, columnspan= 2)
|
||||
labels.append(lbl)
|
||||
|
||||
# Checkboxes to select plotted channels
|
||||
check_frame = ttk.LabelFrame(root, text="Plot Selection")
|
||||
check_frame.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="nw")
|
||||
|
||||
check_vars = []
|
||||
for i in range(16):
|
||||
var = tk.IntVar(value=0)
|
||||
chk = ttk.Checkbutton(check_frame, text=f"Ch {i+1}", variable=var, command=lambda: update_plot())
|
||||
chk.grid(row=i//4, column=i%4, sticky="w", padx=4, pady=2)
|
||||
check_vars.append(var)
|
||||
|
||||
# Right: Matplotlib plot
|
||||
plot_frame = ttk.Frame(root)
|
||||
plot_frame.grid(row=0, column=1, rowspan=2, padx=10, pady=10, sticky="n")
|
||||
|
||||
fig = Figure(figsize=(6, 4), dpi=100)
|
||||
ax = fig.add_subplot(111)
|
||||
ax.set_title("Selected Channels (rolling)")
|
||||
ax.set_xlabel("Timestamp")
|
||||
ax.set_ylabel("Value")
|
||||
canvas = FigureCanvasTkAgg(fig, master=plot_frame)
|
||||
canvas.get_tk_widget().pack(fill="both", expand=True)
|
||||
|
||||
# Optional controls
|
||||
controls_frame = ttk.Frame(plot_frame)
|
||||
controls_frame.pack(fill="x", pady=5)
|
||||
# Clear selection button
|
||||
def clear_selection():
|
||||
for v in check_vars:
|
||||
v.set(0)
|
||||
update_plot()
|
||||
ttk.Button(controls_frame, text="Clear Selection", command=clear_selection).pack(side="left")
|
||||
|
||||
# === Update functions ===
|
||||
Last_tick: int = 0
|
||||
def update_grid(values: list,tick: int):
|
||||
global Last_tick
|
||||
with open(f"{FILENAME}_ADC.csv","+a") as file:
|
||||
file.write(f"{tick}")
|
||||
for i in range(16):
|
||||
file.write(f",{values[i]}")
|
||||
labels[i].config(text=f"{values[i]:n}")
|
||||
labels[16].config(text=f"Last Tick:{tick:n}")
|
||||
labels[17].config(text=f"Delta Tick:{(tick-Last_tick):n}")
|
||||
Last_tick = tick
|
||||
file.write("\n")
|
||||
|
||||
|
||||
def update_SI(value:SD.ReturnDecoder):
|
||||
with open(f"{FILENAME}_SI.csv","+a") as file:
|
||||
file.write(f"{value.timestamp},{value.log_message}\n")
|
||||
print(f"\033[F\033[2Ktime:{value.timestamp}, SI:{value.log_message}",end="\n")
|
||||
|
||||
def append_history(adc_values: list, timestamp: int):
|
||||
"""Store the new sample in the rolling history."""
|
||||
history_x.append(timestamp)
|
||||
for i in range(16):
|
||||
history_y[i].append(adc_values[i])
|
||||
|
||||
def update_plot():
|
||||
"""Redraw plot based on selected channels and available history."""
|
||||
ax.clear()
|
||||
ax.set_title("Selected Channels (rolling)")
|
||||
ax.set_xlabel("Timestamp")
|
||||
ax.set_ylabel("Value")
|
||||
|
||||
selected = [i for i, var in enumerate(check_vars) if var.get() == 1]
|
||||
if len(history_x) == 0 or len(selected) == 0:
|
||||
ax.text(0.5, 0.5, "No data / No channels selected", ha="center", va="center", transform=ax.transAxes)
|
||||
canvas.draw()
|
||||
return
|
||||
|
||||
x = list(history_x)
|
||||
for idx in selected:
|
||||
y = list(history_y[idx])
|
||||
ax.plot(x, y, label=f"Ch {idx+1}")
|
||||
ax.legend(loc="upper left", fontsize=8)
|
||||
ax.grid(True, linestyle="--", alpha=0.3)
|
||||
canvas.draw()
|
||||
|
||||
def update_panel():
|
||||
"""Main polling loop that consumes Serial_decoder outputs (unchanged structure)."""
|
||||
|
||||
values: SD.ReturnDecoder = SD.GetReturn(0.1) # non-blocking read with timeout
|
||||
if values is None:
|
||||
root.after(10, update_panel)
|
||||
return
|
||||
|
||||
# Stop if input thread died
|
||||
if tasks[1].is_alive() == False:
|
||||
root.quit()
|
||||
|
||||
match values.name:
|
||||
case "ADC_FULLRANK":
|
||||
# Update grid + history + plot
|
||||
update_grid(values=values.ADC_data, tick=values.timestamp)
|
||||
append_history(values.ADC_data, values.timestamp)
|
||||
update_plot() # draw right away; if too heavy, throttle with a timer
|
||||
case "SI_DECODER":
|
||||
update_SI(values)
|
||||
case "PR_DECODER":
|
||||
print(f"PRD:{values.text_respons}")
|
||||
case _:
|
||||
print(f"Error {values.name}")
|
||||
|
||||
# Continue polling
|
||||
root.after(10, update_panel) # 10 ms
|
||||
|
||||
# Start polling and GUI loop
|
||||
update_panel()
|
||||
|
||||
# FC commands
|
||||
SD.SerialPort.write(b"SET_ST_SPEED 100\n\r")
|
||||
|
||||
root.mainloop()
|
||||
203
py_void_deocde.py
Normal file
203
py_void_deocde.py
Normal file
@@ -0,0 +1,203 @@
|
||||
import os
|
||||
from typing import BinaryIO, Tuple
|
||||
import re
|
||||
|
||||
START_MARKER = 0xAF
|
||||
END_MARKER = 0xAF # is the same and thus no end marker but start marker can act as end of previos message
|
||||
|
||||
END_ADC_FILE =""
|
||||
|
||||
def list_files_with_suffix(suffix=".bin") -> list[str]:
|
||||
# Get the directory where the script is located
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
# List all files in that directory that end with the given suffix
|
||||
matching_files = [f for f in os.listdir(script_dir)
|
||||
if os.path.isfile(os.path.join(script_dir, f)) and f.endswith(suffix)]
|
||||
return matching_files
|
||||
|
||||
def list_files_with_phrase(phrase :str) -> list[str]:
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
matching_files = [f for f in os.listdir(script_dir)
|
||||
if os.path.isfile(os.path.join(script_dir, f)) and f.find(phrase) != -1]
|
||||
|
||||
def scan_to_end_marker(f: BinaryIO, start_index: int, end_marker: int = END_MARKER) -> tuple[int]:
|
||||
"""
|
||||
Utility: starting from `start_index`, read forward until `end_marker` is found.
|
||||
Returns the number of bytes consumed (including the end marker) or -1 on EOF/error.
|
||||
Leaves the file position at the byte *after* the end marker if successful.
|
||||
"""
|
||||
f.seek(start_index)
|
||||
bytes_read = 0
|
||||
|
||||
while True:
|
||||
b = f.read(1)
|
||||
if not b:
|
||||
# EOF before finding end marker
|
||||
return tuple(-1,)
|
||||
bytes_read += 1
|
||||
if b[0] == end_marker:
|
||||
return (end_marker,)
|
||||
|
||||
|
||||
def decoder_1_ADC(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]:
|
||||
f.seek(start_index)
|
||||
channels = int.from_bytes(f.read(1),byteorder="big",signed=False)
|
||||
|
||||
lst = []
|
||||
|
||||
for i in range(channels):
|
||||
lst.append(int.from_bytes(f.read(3),byteorder="big",signed=False))
|
||||
|
||||
timestamp = int.from_bytes(f.read(4),byteorder="big",signed=False)
|
||||
end_marker = f.read(1)
|
||||
if not end_marker:
|
||||
end_marker = END_MARKER
|
||||
else:
|
||||
end_marker = end_marker[0]
|
||||
if(end_marker != END_MARKER):
|
||||
print(f"start_index{start_index} end_marker:{end_marker.hex()}")
|
||||
return (-1)
|
||||
|
||||
with open(f"{filename}_ADC_data.csv", "a") as file:
|
||||
file.write(f"{timestamp}")
|
||||
for number in lst:
|
||||
file.write(f",{number}")
|
||||
file.write("\n")
|
||||
|
||||
return (3*channels+5,(timestamp,lst))
|
||||
|
||||
LOG_MESSAGE_ENUM_LIST = []
|
||||
with open("SI_LOG_MESSAGES.txt") as file:
|
||||
for message in file:
|
||||
LOG_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message))
|
||||
|
||||
def decoder_SI(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]:
|
||||
f.seek(start_index)
|
||||
LogNumber = int.from_bytes(bytes=f.read(2),byteorder="big",signed=False)
|
||||
TimeStamp = int.from_bytes(bytes=f.read(4),byteorder="big",signed=False)
|
||||
try:
|
||||
LogName = LOG_MESSAGE_ENUM_LIST[LogNumber]
|
||||
except:
|
||||
LogName = f"ERROR UNKOWN: {LogNumber}"
|
||||
with open(f"{filename}_SI.csv", "a") as file:
|
||||
file.write(f"{TimeStamp},{LogName}\n")
|
||||
|
||||
return (6,(TimeStamp,LogNumber))
|
||||
|
||||
COMMAND_MESSAGE_ENUM_LIST = []
|
||||
with open("SI_COMMAND_MESSAGES.txt") as file:
|
||||
for message in file:
|
||||
COMMAND_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message))
|
||||
|
||||
def decoder_SI_command(start_index: int, f: BinaryIO, filename: str) -> tuple[int, tuple[int]]:
|
||||
f.seek(start_index)
|
||||
LogNumber = int.from_bytes(bytes=f.read(2),byteorder="big",signed=False)
|
||||
TimeStamp = int.from_bytes(bytes=f.read(4),byteorder="big",signed=False)
|
||||
try:
|
||||
LogName = COMMAND_MESSAGE_ENUM_LIST[LogNumber]
|
||||
except:
|
||||
LogName = f"ERROR UNKOWN: {LogNumber}"
|
||||
with open(f"{filename}_SI.csv", "a") as file:
|
||||
file.write(f"{TimeStamp},{LogName}\n")
|
||||
return (6,(TimeStamp,LogNumber))
|
||||
|
||||
def decode_unknown(start_index: int, f: BinaryIO, filename: str) -> int:
|
||||
"""
|
||||
Fallback decoder for unknown indicator values.
|
||||
Strategy: consume until END_MARKER. You can also choose to return -1 instead.
|
||||
"""
|
||||
return scan_to_end_marker(f, start_index, END_MARKER)
|
||||
|
||||
|
||||
# -------- Dispatcher that scans the file and calls the decoders -------- #
|
||||
|
||||
def process_frames_in_file(filename: str) -> Tuple[int, int]:
|
||||
"""
|
||||
Scans `filename` for frames with START_MARKER (0xAF), reads the indicator byte,
|
||||
dispatches to the appropriate decoder using structural pattern matching, and advances.
|
||||
|
||||
Decoder contract:
|
||||
- Signature: decoder(start_index: int, f: BinaryIO, filename: str) -> int
|
||||
- `start_index` is the index of the first byte AFTER the indicator byte.
|
||||
- Return the number of bytes consumed starting at `start_index` (typically includes the END_MARKER),
|
||||
or -1 on failure.
|
||||
|
||||
Returns:
|
||||
(ok_count, fail_count): the number of frames successfully decoded and the number that failed.
|
||||
"""
|
||||
ok = 0
|
||||
fail = 0
|
||||
|
||||
|
||||
|
||||
try:
|
||||
with open(f"{filename}.bin", "rb") as f:
|
||||
# Stream through the file byte-by-byte to find START_MARKER
|
||||
while True:
|
||||
b = f.read(1)
|
||||
if not b:
|
||||
break # EOF
|
||||
|
||||
if b[0] != START_MARKER:
|
||||
continue # keep scanning
|
||||
|
||||
# Read the indicator byte immediately after the start marker
|
||||
indicator_raw = f.read(1)
|
||||
if not indicator_raw:
|
||||
# EOF right after a start marker; incomplete frame
|
||||
fail += 1
|
||||
break
|
||||
|
||||
indicator = indicator_raw[0]
|
||||
start_index = f.tell() # first byte after the indicator
|
||||
|
||||
# ---- Structural Pattern Matching to choose decoder ---- #
|
||||
match indicator:
|
||||
case 0x01:
|
||||
decoder = decoder_1_ADC
|
||||
case 0x02:
|
||||
decoder = decoder_SI
|
||||
case 0x03:
|
||||
decoder = decoder_SI_command
|
||||
case _:
|
||||
print(f"Could not recognize indicator:{indicator} at:{start_index-1}")
|
||||
decoder = decode_unknown
|
||||
# ------------------------------------------------------ #
|
||||
|
||||
# Call the decoder with the agreed arguments
|
||||
bytes_used = decoder(start_index, f, filename)[0]
|
||||
|
||||
if bytes_used == -1:
|
||||
# Failed to decode; resynchronize by advancing one byte past start_index
|
||||
fail += 1
|
||||
f.seek(start_index) # go to the first payload byte
|
||||
_ = f.read(1) # advance 1 byte and continue scanning
|
||||
else:
|
||||
ok += 1
|
||||
# Advance to the next position right after the bytes we consumed
|
||||
next_pos = start_index + bytes_used
|
||||
f.seek(next_pos)
|
||||
|
||||
return ok, fail
|
||||
|
||||
except OSError as e:
|
||||
# File can't be opened/read
|
||||
raise RuntimeError(f"Could not process file '{filename}': {e}") from e
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
files = list_files_with_suffix()
|
||||
files_csv = list_files_with_suffix(".csv")
|
||||
|
||||
for i in range(len(files_csv)):
|
||||
for x in range(len(files)):
|
||||
if files[x].split(".")[0] in files_csv[i]:
|
||||
os.remove(files_csv[i])
|
||||
print(f"Removing file: {files_csv[i]}")
|
||||
|
||||
for file in files:
|
||||
print(process_frames_in_file(file.split(".")[0]))
|
||||
|
||||
print(files)
|
||||
Reference in New Issue
Block a user