Use the mainplot.py for interacting with the FC. type commands into the terminal and see the plot data on the screen.

This commit is contained in:
2025-12-02 14:51:43 +01:00
parent 0c7f46f610
commit 32e8ab0b24
5 changed files with 399 additions and 0 deletions

0
COMMANDS.txt Normal file
View File

19
SI_COMMAND_MESSAGES.txt Normal file
View File

@@ -0,0 +1,19 @@
STOP_STREAM
START_STREAM
RESET
WIPE_STM
UPTIME
PING
GET_STATE
ARM
DISARM
STATIC_FIRE1
STATIC_FIRE2
STATIC_FIRE3
ABORT
SERVO_SWEEP
SERVO_SET
GET_READINGS
SET_ST_SPEED
START_FLASH
START_USB

27
SI_LOG_MESSAGES.txt Normal file
View File

@@ -0,0 +1,27 @@
EMPTY_LOG = 0,
LOGGING_DATA,
FLASH_ERROR,
FLASH_START_ERROR,
FLASH_FULL,
FLASH_LOG_FULL,
ENTER_STATIC_FIRE,
EXIT_STATIC_FIRE,
ENTER_ARM,
UPDATE_ARM,
EXIT_ARM,
LOX_OPEN,
LOX_OPEN_FULL,
ETHANOL_OPEN,
ETHANOL_OPEN_FULL,
NITROGEN_OPEN,
NITROGEN_OPEN_FULL,
LOX_CLOSING,
LOX_FULL_CLOSED,
ETHANOL_CLOSING,
ETHANOL_FULL_CLOSED,
NITROGEN_CLOSING,
NITROGEN_FULL_CLOSED,
ETHANOL_DRAIN_OPEN,
LOX_DRAIN_OPEN,
ETHANOL_DRAIN_CLOSED,
LOX_DRAIN_CLOSED

185
Serial_decoder.py Normal file
View File

@@ -0,0 +1,185 @@
import serial
from dataclasses import dataclass
import threading
from dataclasses import dataclass, field
from typing import List
from datetime import datetime
import queue
import re
PORT = "COM9"
SerialPort = serial.Serial(port=PORT,baudrate=250000)
input_queue = queue.SimpleQueue()
output_queue = queue.SimpleQueue()
@dataclass
class ReturnDecoder:
name: str
text_respons: str = ""
ADC_data: List[int] = field(default_factory=lambda: [])
timestamp: int = 0
log_message: str = ""
bytes_read: int = 0
#length is 3*16+4+1
def ADC_FULLRANK(buffer: bytes) -> ReturnDecoder:
if len(buffer)<53 : return ReturnDecoder("EMPTY")
return_var = ReturnDecoder("ADC_FULLRANK")
for i in range(16):
return_var.ADC_data.append(int.from_bytes(bytes=buffer[3*i:3+3*i],byteorder="big",signed=True))
return_var.timestamp = int.from_bytes(bytes=buffer[48:52],byteorder="big",signed=False)
if buffer[52] != 0xFA:
return_var.name = f"FAULT_DECODER_END"
return_var.bytes_read = 1
print(buffer.hex(sep=" "))
return_var.bytes_read = 55
return return_var
LOG_MESSAGE_ENUM_LIST = []
with open("SI_LOG_MESSAGES.txt") as file:
for message in file:
LOG_MESSAGE_ENUM_LIST.append(re.sub("[\n,=0 ]","",message))
def SI_DECODER(buffer: bytes) -> ReturnDecoder:
if(len(buffer)<7): return ReturnDecoder("EMPTY")
return_var = ReturnDecoder("SI_DECODER")
LogNumber = int.from_bytes(bytes=buffer[0:2],byteorder="big",signed=False)
try:
return_var.log_message = LOG_MESSAGE_ENUM_LIST[LogNumber]
except:
return_var.name = "WRONG_SI_NUMBER"
return_var.log_message = LogNumber
return_var.bytes_read = 1
return return_var
return_var.timestamp = int.from_bytes(bytes=buffer[2:6],byteorder="big",signed=False)
if buffer[6] != 0xFA:
return_var.bytes_read = 1
return_var.name = f"FAULT_SI_DECODER_END"
print(buffer.hex(sep=" "))
return_var.bytes_read = 7
return return_var
AF_DECODERS = {
0x01: ADC_FULLRANK,
0x02: SI_DECODER
}
def AFDecoder(buffer: bytes) -> ReturnDecoder:
if len(buffer) < 2: return ReturnDecoder("EMPTY")
ret = ReturnDecoder("FAULT_DECODER")
ret.bytes_read = 1
if buffer[1] in AF_DECODERS:
try:
return AF_DECODERS[buffer[1]](buffer[2:])
except:
return ret
else:
print("unkown message")
return ret
def PRDecoder(buffer: bytes) -> ReturnDecoder:
if buffer.find(b"\n") < 0:
return ReturnDecoder("EMPTY")
temp_str = buffer.decode("ascii",errors="replace")
ret = ReturnDecoder("PR_DECODER")
ret.text_respons = temp_str.split(sep='\n',maxsplit=2)[0]
ret.bytes_read = len(ret.text_respons)
return ret
def input_loop():
while True:
input_str = input()+'\n\r'
if input_str.find("exit") != -1:
return
input_queue.put(input_str)
def output_loop():
ret_DEC : ReturnDecoder = ReturnDecoder("EMPTY")
buffer : bytes = bytes(b"")
SerialPort.timeout = 0.1
input_str: str = ""
ret_DEC.name = "EMPTY"
ret_DEC.bytes_read = 0
while True:
try:
input_str = input_queue.get_nowait()
print(f"SD: sending {input_str}")
SerialPort.write(input_str.encode("ascii"))
except:
pass
buffer = buffer + SerialPort.read_all()
AF_index = buffer.find(0xAF)
PR_index = buffer.find(b"PR")
if AF_index >= 0 and PR_index >= 0:
if AF_index < PR_index:
#if(AF_index): print(buffer[:AF_index].decode("ascii","replace"))
buffer = buffer[AF_index:]
ret_DEC = AFDecoder(buffer)
else:
#if(PR_index): print(buffer[:PR_index].decode("ascii","replace"))
buffer = buffer[PR_index:]
ret_DEC = PRDecoder(buffer)
elif AF_index >= 0:
#if(AF_index): print(buffer[:AF_index].decode("ascii","replace"))
buffer = buffer[AF_index:]
ret_DEC = AFDecoder(buffer)
elif PR_index >= 0:
#if(PR_index): print(buffer[:PR_index].decode("ascii","replace"))
buffer = buffer[PR_index:]
ret_DEC = PRDecoder(buffer)
if ret_DEC.bytes_read != 0:
buffer = buffer[ret_DEC.bytes_read:]
if ret_DEC.name != "EMPTY":
output_queue.put(ret_DEC)
ret_DEC = ReturnDecoder("EMPTY")
def GetReturn(timeout: float) -> ReturnDecoder | None:
try:
return output_queue.get(timeout=timeout)
except queue.Empty:
return None
def SendCommand(input_str: str):
input_queue.put(input_str)
def StartInputOutput() -> tuple[threading.Thread, threading.Thread]:
t1 = threading.Thread(target=output_loop,daemon=True)
t2 = threading.Thread(target=input_loop, daemon=True)
t1.start()
t2.start()
return (t1,t2)
def StartOutput() -> threading.Thread:
t1 = threading.Thread(target=output_loop,daemon=True)
t1.start()
return t1
if __name__ == "__main__":
t1 = threading.Thread(target=output_loop,daemon=True)
t2 = threading.Thread(target=input_loop)
t1.start()
t2.start()
while True:
value = GetReturn(0.5)
if t2.is_alive() == False:
break
if value == None:
continue
match value.name:
case "ADC_FULLRANK":
print(f"time:{value.timestamp},{value.ADC_data}")
case "SI_DECODER":
print(f"\033[Ftime:{value.timestamp}, SI:{value.log_message}",end="\r")
case "PR_DECODER":
print(f"PRD:{value.text_respons}")
case _:
print(f"Error {value.name}")
print("stopping")

168
mainplot.py Normal file
View File

@@ -0,0 +1,168 @@
import Serial_decoder as SD
import tkinter as tk
from tkinter import ttk
from datetime import datetime
import time
from collections import deque
# Matplotlib embedding in Tkinter
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
# === Serial threads (unchanged) ===
tasks = SD.StartInputOutput() # t1: output_loop (daemon), t2: input_loop (daemon)
FILENAME = f"SaveData/{datetime.today().strftime('%Y_%m_%d_%H_%M')}"
# === Plot history ===
MAX_POINTS = 100 # rolling window length
history_x = deque(maxlen=MAX_POINTS) # timestamps
history_y = [deque(maxlen=MAX_POINTS) for _ in range(16)] # per-channel values
# === GUI setup ===
root = tk.Tk()
root.title("Live 4x4 Grid + Selectable Real-Time Plot")
# Left: 4x4 grid
grid_frame = ttk.Frame(root)
grid_frame.grid(row=0, column=0, padx=10, pady=10, sticky="n")
labels = []
for row in range(4):
for col in range(4):
idx = row * 4 + col
lbl = tk.Label(
grid_frame, name=f"channel{idx}",
text="0", width=10, height=3,
borderwidth=2, relief="groove", font=("Arial", 14)
)
lbl.grid(row=row, column=col, padx=5, pady=5)
labels.append(lbl)
# Last tick label (as you had)
last_tick_lbl = tk.Label(
grid_frame, text="Last Tick: -", width=20, height=3,
borderwidth=2, relief="groove", font=("Arial", 14)
)
lbl = tk.Label(root, text="Last Tick:0", width=20, height=3, borderwidth=2, relief="groove", font=("Arial", 14))
lbl.grid(row = 4, column=0, columnspan=2)
labels.append(lbl)
lbl = tk.Label(root, text="Delta Tick:0", width=20, height=3, borderwidth=2, relief="groove", font=("Arial", 14))
lbl.grid(row=4, column=2, columnspan= 2)
labels.append(lbl)
# Checkboxes to select plotted channels
check_frame = ttk.LabelFrame(root, text="Plot Selection")
check_frame.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="nw")
check_vars = []
for i in range(16):
var = tk.IntVar(value=0)
chk = ttk.Checkbutton(check_frame, text=f"Ch {i+1}", variable=var, command=lambda: update_plot())
chk.grid(row=i//4, column=i%4, sticky="w", padx=4, pady=2)
check_vars.append(var)
# Right: Matplotlib plot
plot_frame = ttk.Frame(root)
plot_frame.grid(row=0, column=1, rowspan=2, padx=10, pady=10, sticky="n")
fig = Figure(figsize=(6, 4), dpi=100)
ax = fig.add_subplot(111)
ax.set_title("Selected Channels (rolling)")
ax.set_xlabel("Timestamp")
ax.set_ylabel("Value")
canvas = FigureCanvasTkAgg(fig, master=plot_frame)
canvas.get_tk_widget().pack(fill="both", expand=True)
# Optional controls
controls_frame = ttk.Frame(plot_frame)
controls_frame.pack(fill="x", pady=5)
# Clear selection button
def clear_selection():
for v in check_vars:
v.set(0)
update_plot()
ttk.Button(controls_frame, text="Clear Selection", command=clear_selection).pack(side="left")
# === Update functions ===
Last_tick: int = 0
def update_grid(values: list,tick: int):
global Last_tick
with open(f"{FILENAME}_ADC.csv","+a") as file:
file.write(f"{tick}")
for i in range(16):
file.write(f",{values[i]}")
labels[i].config(text=f"{values[i]:n}")
labels[16].config(text=f"Last Tick:{tick:n}")
labels[17].config(text=f"Delta Tick:{(tick-Last_tick):n}")
Last_tick = tick
file.write("\n")
def update_SI(value:SD.ReturnDecoder):
with open(f"{FILENAME}_SI.csv","+a") as file:
file.write(f"{value.timestamp},{value.log_message}\n")
print(f"\033[F\033[2Ktime:{value.timestamp}, SI:{value.log_message}",end="\n")
def append_history(adc_values: list, timestamp: int):
"""Store the new sample in the rolling history."""
history_x.append(timestamp)
for i in range(16):
history_y[i].append(adc_values[i])
def update_plot():
"""Redraw plot based on selected channels and available history."""
ax.clear()
ax.set_title("Selected Channels (rolling)")
ax.set_xlabel("Timestamp")
ax.set_ylabel("Value")
selected = [i for i, var in enumerate(check_vars) if var.get() == 1]
if len(history_x) == 0 or len(selected) == 0:
ax.text(0.5, 0.5, "No data / No channels selected", ha="center", va="center", transform=ax.transAxes)
canvas.draw()
return
x = list(history_x)
for idx in selected:
y = list(history_y[idx])
ax.plot(x, y, label=f"Ch {idx+1}")
ax.legend(loc="upper left", fontsize=8)
ax.grid(True, linestyle="--", alpha=0.3)
canvas.draw()
def update_panel():
"""Main polling loop that consumes Serial_decoder outputs (unchanged structure)."""
values: SD.ReturnDecoder = SD.GetReturn(0.1) # non-blocking read with timeout
if values is None:
root.after(10, update_panel)
return
# Stop if input thread died
if tasks[1].is_alive() == False:
root.quit()
match values.name:
case "ADC_FULLRANK":
# Update grid + history + plot
update_grid(values=values.ADC_data, tick=values.timestamp)
append_history(values.ADC_data, values.timestamp)
update_plot() # draw right away; if too heavy, throttle with a timer
case "SI_DECODER":
update_SI(values)
case "PR_DECODER":
print(f"PRD:{values.text_respons}")
case _:
print(f"Error {values.name}")
# Continue polling
root.after(10, update_panel) # 10 ms
# Start polling and GUI loop
update_panel()
# FC commands
SD.SerialPort.write(b"SET_ST_SPEED 100\n\r")
root.mainloop()