347 lines
11 KiB
Python
347 lines
11 KiB
Python
import Serial_decoder as SD
|
|
import tkinter as tk
|
|
from tkinter import ttk
|
|
from datetime import datetime
|
|
import time
|
|
from collections import deque
|
|
|
|
# Matplotlib embedding in Tkinter
|
|
from matplotlib.figure import Figure
|
|
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
|
|
|
# === Serial threads (unchanged) ===
|
|
tasks = SD.StartInputOutput() # t1: output_loop (daemon), t2: input_loop (daemon)
|
|
FILENAME = f"SaveData/{datetime.today().strftime('%Y_%m_%d_%H_%M')}"
|
|
|
|
# === Commands window config ===
|
|
COMMANDS_FILE = "COMMANDS.txt" # one command per line
|
|
|
|
|
|
# === Plot history ===
|
|
MAX_POINTS = 100 # rolling window length
|
|
|
|
# Derived channels (what we actually plot):
|
|
# 0: Thrust (avg of load cells 1 & 2)
|
|
# 1-6: Temp 1..6
|
|
# 7-14: Pressure 1..8
|
|
DERIVED_CHANNEL_LABELS = [
|
|
"Thrust", # 0
|
|
"Temp 1", # 1
|
|
"Temp 2", # 2
|
|
"Temp 3", # 3
|
|
"Temp 4", # 4
|
|
"Temp 5", # 5
|
|
"Temp 6", # 6
|
|
"Pressure 1", # 7
|
|
"Pressure 2", # 8
|
|
"Pressure 3", # 9
|
|
"Pressure 4", # 10
|
|
"Pressure 5", # 11
|
|
"Pressure 6", # 12
|
|
"Pressure 7", # 13
|
|
"Pressure 8", # 14
|
|
]
|
|
NUM_DERIVED_CHANNELS = len(DERIVED_CHANNEL_LABELS) # 15
|
|
|
|
history_x = deque(maxlen=MAX_POINTS) # timestamps
|
|
history_y = [deque(maxlen=MAX_POINTS) for _ in range(NUM_DERIVED_CHANNELS)] # per-channel values
|
|
|
|
# === GUI setup ===
|
|
root = tk.Tk()
|
|
root.title("Live 4x4 Grid + Selectable Real-Time Plot")
|
|
|
|
# Left: 4x4 grid (raw ADC values from 16 channels)
|
|
grid_frame = ttk.Frame(root)
|
|
grid_frame.grid(row=0, column=0, padx=10, pady=10, sticky="n")
|
|
|
|
labels = []
|
|
for row in range(4):
|
|
for col in range(4):
|
|
idx = row * 4 + col
|
|
lbl = tk.Label(
|
|
grid_frame, name=f"channel{idx}",
|
|
text="0", width=10, height=3,
|
|
borderwidth=2, relief="groove", font=("Arial", 14)
|
|
)
|
|
lbl.grid(row=row, column=col, padx=5, pady=5)
|
|
labels.append(lbl)
|
|
|
|
# Last / delta tick labels
|
|
lbl = tk.Label(root, text="Last Tick:0", width=20, height=3, borderwidth=2, relief="groove", font=("Arial", 14))
|
|
lbl.grid(row=4, column=0, columnspan=2)
|
|
labels.append(lbl) # index 16
|
|
|
|
lbl = tk.Label(root, text="Delta Tick:0", width=20, height=3, borderwidth=2, relief="groove", font=("Arial", 14))
|
|
lbl.grid(row=4, column=1, columnspan=2)
|
|
labels.append(lbl) # index 17
|
|
|
|
# Checkboxes to select plotted (derived) channels
|
|
check_frame = ttk.LabelFrame(root, text="Plot Selection")
|
|
check_frame.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="nw")
|
|
|
|
check_vars = []
|
|
for i, ch_label in enumerate(DERIVED_CHANNEL_LABELS):
|
|
var = tk.IntVar(value=0)
|
|
chk = ttk.Checkbutton(
|
|
check_frame,
|
|
text=ch_label,
|
|
variable=var,
|
|
command=None # will assign update_plot after it is defined
|
|
)
|
|
chk.grid(row=i // 4, column=i % 4, sticky="w", padx=4, pady=2)
|
|
check_vars.append(var)
|
|
|
|
# Right: Matplotlib plot
|
|
plot_frame = ttk.Frame(root)
|
|
plot_frame.grid(row=0, column=1, rowspan=2, padx=10, pady=10, sticky="n")
|
|
|
|
fig = Figure(figsize=(6, 4), dpi=100)
|
|
ax = fig.add_subplot(111)
|
|
ax.set_title("Selected Channels")
|
|
ax.set_xlabel("Timestamp")
|
|
ax.set_ylabel("Value")
|
|
canvas = FigureCanvasTkAgg(fig, master=plot_frame)
|
|
canvas.get_tk_widget().pack(fill="both", expand=True)
|
|
|
|
# Optional controls
|
|
controls_frame = ttk.Frame(plot_frame)
|
|
controls_frame.pack(fill="x", pady=5)
|
|
|
|
|
|
def clear_selection():
|
|
for v in check_vars:
|
|
v.set(0)
|
|
update_plot()
|
|
|
|
|
|
def clear_history():
|
|
"""Clear stored history for all plotted channels."""
|
|
history_x.clear()
|
|
for dq in history_y:
|
|
dq.clear()
|
|
update_plot() # refresh plot so it shows 'No data' message
|
|
|
|
|
|
ttk.Button(controls_frame, text="Clear Selection", command=clear_selection).pack(side="left")
|
|
ttk.Button(controls_frame, text="Clear History", command=clear_history).pack(side="left")
|
|
|
|
|
|
# === Commands window ===
|
|
|
|
def open_commands_window():
|
|
"""
|
|
Create an extra window that shows buttons for each command found in COMMANDS.txt.
|
|
Each button sends the corresponding command string via SD.SendCommand.
|
|
"""
|
|
cmd_win = tk.Toplevel(root)
|
|
cmd_win.title("Command Buttons")
|
|
|
|
outer = ttk.Frame(cmd_win, padding=10)
|
|
outer.pack(fill="both", expand=True)
|
|
|
|
# Scrollable area in case there are many commands
|
|
canvas_widget = tk.Canvas(outer, borderwidth=0)
|
|
scrollbar = ttk.Scrollbar(outer, orient="vertical", command=canvas_widget.yview)
|
|
cmds_frame = ttk.Frame(canvas_widget)
|
|
|
|
cmds_frame.bind(
|
|
"<Configure>",
|
|
lambda e: canvas_widget.configure(scrollregion=canvas_widget.bbox("all"))
|
|
)
|
|
|
|
canvas_widget.create_window((0, 0), window=cmds_frame, anchor="nw")
|
|
canvas_widget.configure(yscrollcommand=scrollbar.set)
|
|
|
|
canvas_widget.pack(side="left", fill="both", expand=True)
|
|
scrollbar.pack(side="right", fill="y")
|
|
|
|
# Load commands from file
|
|
try:
|
|
with open(COMMANDS_FILE, "r") as f:
|
|
commands = [
|
|
line.strip()
|
|
for line in f
|
|
if line.strip() and not line.lstrip().startswith("#")
|
|
]
|
|
except OSError as e:
|
|
ttk.Label(
|
|
cmds_frame,
|
|
text=f"Could not read {COMMANDS_FILE}:\n{e}",
|
|
foreground="red",
|
|
justify="left"
|
|
).pack(anchor="w")
|
|
return
|
|
|
|
if not commands:
|
|
ttk.Label(
|
|
cmds_frame,
|
|
text=f"No commands found in {COMMANDS_FILE}.",
|
|
foreground="red"
|
|
).pack(anchor="w")
|
|
return
|
|
s = ttk.Style()
|
|
s.configure('TButton', font=('Arial', 14))
|
|
# Create a button per command
|
|
for i , cmd in enumerate(commands):
|
|
def make_callback(c=cmd):
|
|
def _cb():
|
|
# Ensure it ends with newline/carriage return similar to console input
|
|
to_send = c
|
|
if not to_send.endswith("\n") and not to_send.endswith("\r"):
|
|
to_send = to_send + "\n\r"
|
|
SD.SendCommand(to_send)
|
|
return _cb
|
|
|
|
row = i // 2
|
|
col = i % 2
|
|
|
|
btn = ttk.Button(cmds_frame, text=cmd, command=make_callback(),style='TButton')
|
|
btn.grid(row=row, column=col, padx=5, pady=5, sticky="ew")
|
|
|
|
cmds_frame.columnconfigure(0, weight=1)
|
|
cmds_frame.columnconfigure(1, weight=1)
|
|
|
|
# Add a button in the main UI to reopen the commands window if needed
|
|
ttk.Button(controls_frame, text="Open Commands", command=open_commands_window).pack(side="left", padx=(10, 0))
|
|
|
|
|
|
# === Update functions ===
|
|
Last_tick: int = 0
|
|
|
|
|
|
def update_grid(values: list, tick: int):
|
|
"""Update the 4x4 grid with raw ADC values and write ADC csv."""
|
|
global Last_tick
|
|
# NOTE: mode should be "a" or "a+", not "+a"
|
|
with open(f"{FILENAME}_ADC.csv", "a") as file:
|
|
file.write(f"{tick}")
|
|
for i in range(16): # 16 raw ADC channels
|
|
file.write(f",{values[i]}")
|
|
labels[i].config(text=f"{values[i]:n}")
|
|
ms_total = tick
|
|
hours, rem = divmod(ms_total, 3_600_000) # 1000*60*60
|
|
minutes, rem = divmod(rem, 60_000) # 1000*60
|
|
seconds, milliseconds = divmod(rem, 1000)
|
|
labels[16].config(text=f"UP Time:{hours:02d}:{minutes:02d}:{seconds:02d}.{milliseconds:03d}")
|
|
labels[17].config(text=f"Delta Tick:{(tick - Last_tick):n}")
|
|
Last_tick = tick
|
|
file.write("\n")
|
|
|
|
|
|
def update_SI(value: SD.ReturnDecoder):
|
|
# NOTE: mode should be "a" or "a+", not "+a"
|
|
with open(f"{FILENAME}_SI.csv", "a") as file:
|
|
file.write(f"{value.timestamp},{value.log_message}\n")
|
|
print(f"Time:{value.timestamp}, SI:{value.log_message}", end="\n")
|
|
|
|
|
|
Last_history_call: int = 0
|
|
|
|
|
|
def append_history(adc_values: list, timestamp: int):
|
|
"""Store the new sample in the rolling history (derived units)."""
|
|
global Last_history_call
|
|
if len(history_x) > 0 and timestamp < history_x[-1]:
|
|
return # discard out-of-order samples
|
|
if len(history_x) > 0 and (history_x[-1] + (int(time.time_ns() / 1000000) - Last_history_call) * 2) < timestamp:
|
|
print(f"Discarding future sample: {timestamp} > {history_x[-1]} + "
|
|
f"{(int(time.time_ns() / 1000000) - Last_history_call) * 2}")
|
|
return # discard samples too far in future
|
|
history_x.append(timestamp)
|
|
Last_history_call = int(time.time_ns() / 1000000) # in ms
|
|
|
|
# 0: Thrust (average of load cells 0 and 1)
|
|
lc_avg = (adc_values[0] + adc_values[1]) / 2
|
|
weight = -1.166759308000000e-04 * lc_avg + 4.971416323340051e+02
|
|
history_y[0].append(weight)
|
|
|
|
# 1-6: Temperatures from channels 2..7
|
|
for i in range(2, 8): # six thermocouples
|
|
temp = 0.000111 * adc_values[i] + 2.31991
|
|
history_y[i - 1].append(temp) # 2→1, 7→6
|
|
|
|
# 7-14: Pressures from channels 8..15
|
|
for i in range(8, 16): # eight pressure sensors
|
|
if i == 12:
|
|
pres = 0.0000153522 * (adc_values[i] / 2) - 6.5652036917
|
|
history_y[i - 1].append(pres) # 8→7, 15→14
|
|
else:
|
|
pres = 0.0000153522 * adc_values[i] - 6.5652036917
|
|
history_y[i - 1].append(pres) # 8→7, 15→14
|
|
|
|
|
|
def update_plot():
|
|
"""Redraw plot based on selected derived channels and available history."""
|
|
ax.clear()
|
|
ax.set_title("Selected Channels")
|
|
ax.set_xlabel("Timestamp")
|
|
ax.set_ylabel("Value")
|
|
|
|
selected = [i for i, var in enumerate(check_vars) if var.get() == 1]
|
|
if len(history_x) == 0 or len(selected) == 0:
|
|
ax.text(
|
|
0.5, 0.5,
|
|
"No data / No channels selected",
|
|
ha="center", va="center",
|
|
transform=ax.transAxes
|
|
)
|
|
canvas.draw()
|
|
return
|
|
|
|
x = list(history_x)
|
|
for idx in selected:
|
|
# safety: guard against any mismatch, just in case
|
|
if idx < len(history_y):
|
|
y = list(history_y[idx])
|
|
ax.plot(x, y, label=DERIVED_CHANNEL_LABELS[idx])
|
|
|
|
ax.legend(loc="upper left", fontsize=8)
|
|
ax.grid(True, linestyle="--", alpha=0.3)
|
|
canvas.draw()
|
|
|
|
|
|
# Now that update_plot exists, fix the checkbox command to point to it
|
|
for child in check_frame.winfo_children():
|
|
if isinstance(child, ttk.Checkbutton):
|
|
child.config(command=update_plot)
|
|
|
|
|
|
def update_panel():
|
|
"""Main polling loop that consumes Serial_decoder outputs."""
|
|
values: SD.ReturnDecoder = SD.GetReturn(0.1) # non-blocking read with timeout
|
|
if values is None:
|
|
root.after(10, update_panel)
|
|
return
|
|
|
|
# Stop if input thread died
|
|
if tasks[1].is_alive() is False:
|
|
root.quit()
|
|
|
|
match values.name:
|
|
case "ADC_FULLRANK":
|
|
# Update grid + history + plot
|
|
update_grid(values=values.ADC_data, tick=values.timestamp)
|
|
append_history(values.ADC_data, values.timestamp)
|
|
update_plot() # draw right away; if too heavy, throttle with a timer
|
|
case "SI_DECODER":
|
|
update_SI(values)
|
|
case "PR_DECODER":
|
|
print(f"PRD:{values.text_respons}")
|
|
case _:
|
|
print(f"Error {values.name}")
|
|
|
|
# Continue polling
|
|
root.after(10, update_panel) # 10 ms
|
|
|
|
|
|
# Start polling and GUI loop
|
|
update_panel()
|
|
|
|
# FC commands
|
|
SD.SerialPort.write(b"SET_ST_SPEED 100\n\r")
|
|
|
|
# Open the commands window at startup
|
|
open_commands_window()
|
|
|
|
root.mainloop()
|