import Serial_decoder as SD import tkinter as tk from tkinter import ttk from datetime import datetime import time from collections import deque # Matplotlib embedding in Tkinter from matplotlib.figure import Figure from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg # === Serial threads (unchanged) === tasks = SD.StartInputOutput() # t1: output_loop (daemon), t2: input_loop (daemon) FILENAME = f"SaveData/{datetime.today().strftime('%Y_%m_%d_%H_%M')}" # === Commands window config === COMMANDS_FILE = "COMMANDS.txt" # one command per line # === Plot history === MAX_POINTS = 100 # rolling window length # Derived channels (what we actually plot): # 0: Thrust (avg of load cells 1 & 2) # 1-6: Temp 1..6 # 7-14: Pressure 1..8 DERIVED_CHANNEL_LABELS = [ "Thrust", # 0 "Temp 1", # 1 "Temp 2", # 2 "Temp 3", # 3 "Temp 4", # 4 "Temp 5", # 5 "Temp 6", # 6 "Pressure 1", # 7 "Pressure 2", # 8 "Pressure 3", # 9 "Pressure 4", # 10 "Pressure 5", # 11 "Pressure 6", # 12 "Pressure 7", # 13 "Pressure 8", # 14 ] NUM_DERIVED_CHANNELS = len(DERIVED_CHANNEL_LABELS) # 15 history_x = deque(maxlen=MAX_POINTS) # timestamps history_y = [deque(maxlen=MAX_POINTS) for _ in range(NUM_DERIVED_CHANNELS)] # per-channel values # === GUI setup === root = tk.Tk() root.title("Live 4x4 Grid + Selectable Real-Time Plot") # Left: 4x4 grid (raw ADC values from 16 channels) grid_frame = ttk.Frame(root) grid_frame.grid(row=0, column=0, padx=10, pady=10, sticky="n") labels = [] for row in range(4): for col in range(4): idx = row * 4 + col lbl = tk.Label( grid_frame, name=f"channel{idx}", text="0", width=10, height=3, borderwidth=2, relief="groove", font=("Arial", 14) ) lbl.grid(row=row, column=col, padx=5, pady=5) labels.append(lbl) # Last / delta tick labels lbl = tk.Label(root, text="Last Tick:0", width=20, height=3, borderwidth=2, relief="groove", font=("Arial", 14)) lbl.grid(row=4, column=0, columnspan=2) labels.append(lbl) # index 16 lbl = tk.Label(root, text="Delta Tick:0", width=20, height=3, borderwidth=2, relief="groove", font=("Arial", 14)) lbl.grid(row=4, column=1, columnspan=2) labels.append(lbl) # index 17 # Checkboxes to select plotted (derived) channels check_frame = ttk.LabelFrame(root, text="Plot Selection") check_frame.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="nw") check_vars = [] for i, ch_label in enumerate(DERIVED_CHANNEL_LABELS): var = tk.IntVar(value=0) chk = ttk.Checkbutton( check_frame, text=ch_label, variable=var, command=None # will assign update_plot after it is defined ) chk.grid(row=i // 4, column=i % 4, sticky="w", padx=4, pady=2) check_vars.append(var) # Right: Matplotlib plot plot_frame = ttk.Frame(root) plot_frame.grid(row=0, column=1, rowspan=2, padx=10, pady=10, sticky="n") fig = Figure(figsize=(6, 4), dpi=100) ax = fig.add_subplot(111) ax.set_title("Selected Channels") ax.set_xlabel("Timestamp") ax.set_ylabel("Value") canvas = FigureCanvasTkAgg(fig, master=plot_frame) canvas.get_tk_widget().pack(fill="both", expand=True) # Optional controls controls_frame = ttk.Frame(plot_frame) controls_frame.pack(fill="x", pady=5) def clear_selection(): for v in check_vars: v.set(0) update_plot() def clear_history(): """Clear stored history for all plotted channels.""" history_x.clear() for dq in history_y: dq.clear() update_plot() # refresh plot so it shows 'No data' message ttk.Button(controls_frame, text="Clear Selection", command=clear_selection).pack(side="left") ttk.Button(controls_frame, text="Clear History", command=clear_history).pack(side="left") # === Commands window === def open_commands_window(): """ Create an extra window that shows buttons for each command found in COMMANDS.txt. Each button sends the corresponding command string via SD.SendCommand. """ cmd_win = tk.Toplevel(root) cmd_win.title("Command Buttons") outer = ttk.Frame(cmd_win, padding=10) outer.pack(fill="both", expand=True) # Scrollable area in case there are many commands canvas_widget = tk.Canvas(outer, borderwidth=0) scrollbar = ttk.Scrollbar(outer, orient="vertical", command=canvas_widget.yview) cmds_frame = ttk.Frame(canvas_widget) cmds_frame.bind( "", lambda e: canvas_widget.configure(scrollregion=canvas_widget.bbox("all")) ) canvas_widget.create_window((0, 0), window=cmds_frame, anchor="nw") canvas_widget.configure(yscrollcommand=scrollbar.set) canvas_widget.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # Load commands from file try: with open(COMMANDS_FILE, "r") as f: commands = [ line.strip() for line in f if line.strip() and not line.lstrip().startswith("#") ] except OSError as e: ttk.Label( cmds_frame, text=f"Could not read {COMMANDS_FILE}:\n{e}", foreground="red", justify="left" ).pack(anchor="w") return if not commands: ttk.Label( cmds_frame, text=f"No commands found in {COMMANDS_FILE}.", foreground="red" ).pack(anchor="w") return s = ttk.Style() s.configure('TButton', font=('Arial', 14)) # Create a button per command for i , cmd in enumerate(commands): def make_callback(c=cmd): def _cb(): # Ensure it ends with newline/carriage return similar to console input to_send = c if not to_send.endswith("\n") and not to_send.endswith("\r"): to_send = to_send + "\n\r" SD.SendCommand(to_send) return _cb row = i // 2 col = i % 2 btn = ttk.Button(cmds_frame, text=cmd, command=make_callback(),style='TButton') btn.grid(row=row, column=col, padx=5, pady=5, sticky="ew") cmds_frame.columnconfigure(0, weight=1) cmds_frame.columnconfigure(1, weight=1) # Add a button in the main UI to reopen the commands window if needed ttk.Button(controls_frame, text="Open Commands", command=open_commands_window).pack(side="left", padx=(10, 0)) # === Update functions === Last_tick: int = 0 def update_grid(values: list, tick: int): """Update the 4x4 grid with raw ADC values and write ADC csv.""" global Last_tick # NOTE: mode should be "a" or "a+", not "+a" with open(f"{FILENAME}_ADC.csv", "a") as file: file.write(f"{tick}") for i in range(16): # 16 raw ADC channels file.write(f",{values[i]}") labels[i].config(text=f"{values[i]:n}") ms_total = tick hours, rem = divmod(ms_total, 3_600_000) # 1000*60*60 minutes, rem = divmod(rem, 60_000) # 1000*60 seconds, milliseconds = divmod(rem, 1000) labels[16].config(text=f"UP Time:{hours:02d}:{minutes:02d}:{seconds:02d}.{milliseconds:03d}") labels[17].config(text=f"Delta Tick:{(tick - Last_tick):n}") Last_tick = tick file.write("\n") def update_SI(value: SD.ReturnDecoder): # NOTE: mode should be "a" or "a+", not "+a" with open(f"{FILENAME}_SI.csv", "a") as file: file.write(f"{value.timestamp},{value.log_message}\n") print(f"Time:{value.timestamp}, SI:{value.log_message}", end="\n") Last_history_call: int = 0 def append_history(adc_values: list, timestamp: int): """Store the new sample in the rolling history (derived units).""" global Last_history_call if len(history_x) > 0 and timestamp < history_x[-1]: return # discard out-of-order samples if len(history_x) > 0 and (history_x[-1] + (int(time.time_ns() / 1000000) - Last_history_call) * 2) < timestamp: print(f"Discarding future sample: {timestamp} > {history_x[-1]} + " f"{(int(time.time_ns() / 1000000) - Last_history_call) * 2}") return # discard samples too far in future history_x.append(timestamp) Last_history_call = int(time.time_ns() / 1000000) # in ms # 0: Thrust (average of load cells 0 and 1) lc_avg = (adc_values[0] + adc_values[1]) / 2 weight = -1.166759308000000e-04 * lc_avg + 4.971416323340051e+02 history_y[0].append(weight) # 1-6: Temperatures from channels 2..7 for i in range(2, 8): # six thermocouples temp = 0.000111 * adc_values[i] + 2.31991 history_y[i - 1].append(temp) # 2→1, 7→6 # 7-14: Pressures from channels 8..15 for i in range(8, 16): # eight pressure sensors if i == 12: pres = 0.0000153522 * (adc_values[i] / 2) - 6.5652036917 history_y[i - 1].append(pres) # 8→7, 15→14 else: pres = 0.0000153522 * adc_values[i] - 6.5652036917 history_y[i - 1].append(pres) # 8→7, 15→14 def update_plot(): """Redraw plot based on selected derived channels and available history.""" ax.clear() ax.set_title("Selected Channels") ax.set_xlabel("Timestamp") ax.set_ylabel("Value") selected = [i for i, var in enumerate(check_vars) if var.get() == 1] if len(history_x) == 0 or len(selected) == 0: ax.text( 0.5, 0.5, "No data / No channels selected", ha="center", va="center", transform=ax.transAxes ) canvas.draw() return x = list(history_x) for idx in selected: # safety: guard against any mismatch, just in case if idx < len(history_y): y = list(history_y[idx]) ax.plot(x, y, label=DERIVED_CHANNEL_LABELS[idx]) ax.legend(loc="upper left", fontsize=8) ax.grid(True, linestyle="--", alpha=0.3) canvas.draw() # Now that update_plot exists, fix the checkbox command to point to it for child in check_frame.winfo_children(): if isinstance(child, ttk.Checkbutton): child.config(command=update_plot) def update_panel(): """Main polling loop that consumes Serial_decoder outputs.""" values: SD.ReturnDecoder = SD.GetReturn(0.1) # non-blocking read with timeout if values is None: root.after(10, update_panel) return # Stop if input thread died if tasks[1].is_alive() is False: root.quit() match values.name: case "ADC_FULLRANK": # Update grid + history + plot update_grid(values=values.ADC_data, tick=values.timestamp) append_history(values.ADC_data, values.timestamp) update_plot() # draw right away; if too heavy, throttle with a timer case "SI_DECODER": update_SI(values) case "PR_DECODER": print(f"PRD:{values.text_respons}") case _: print(f"Error {values.name}") # Continue polling root.after(10, update_panel) # 10 ms # Start polling and GUI loop update_panel() # FC commands SD.SerialPort.write(b"SET_ST_SPEED 100\n\r") # Open the commands window at startup open_commands_window() root.mainloop()