import Serial_decoder as SD import tkinter as tk from tkinter import ttk from datetime import datetime import time from collections import deque # Matplotlib embedding in Tkinter from matplotlib.figure import Figure from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg # === Serial threads (unchanged) === tasks = SD.StartInputOutput() # t1: output_loop (daemon), t2: input_loop (daemon) FILENAME = f"SaveData/{datetime.today().strftime('%Y_%m_%d_%H_%M')}" # === Plot history === MAX_POINTS = 100 # rolling window length # Derived channels (what we actually plot): # 0: Thrust (avg of load cells 1 & 2) # 1-6: Temp 1..6 # 7-14: Pressure 1..8 DERIVED_CHANNEL_LABELS = [ "Thrust", # 0 "Temp 1", # 1 "Temp 2", # 2 "Temp 3", # 3 "Temp 4", # 4 "Temp 5", # 5 "Temp 6", # 6 "Pressure 1", # 7 "Pressure 2", # 8 "Pressure 3", # 9 "Pressure 4", # 10 "Pressure 5", # 11 "Pressure 6", # 12 "Pressure 7", # 13 "Pressure 8", # 14 ] NUM_DERIVED_CHANNELS = len(DERIVED_CHANNEL_LABELS) # 15 history_x = deque(maxlen=MAX_POINTS) # timestamps history_y = [deque(maxlen=MAX_POINTS) for _ in range(NUM_DERIVED_CHANNELS)] # per-channel values # === GUI setup === root = tk.Tk() root.title("Live 4x4 Grid + Selectable Real-Time Plot") # Left: 4x4 grid (raw ADC values from 16 channels) grid_frame = ttk.Frame(root) grid_frame.grid(row=0, column=0, padx=10, pady=10, sticky="n") labels = [] for row in range(4): for col in range(4): idx = row * 4 + col lbl = tk.Label( grid_frame, name=f"channel{idx}", text="0", width=10, height=3, borderwidth=2, relief="groove", font=("Arial", 14) ) lbl.grid(row=row, column=col, padx=5, pady=5) labels.append(lbl) # Last / delta tick labels lbl = tk.Label(root, text="Last Tick:0", width=20, height=3, borderwidth=2, relief="groove", font=("Arial", 14)) lbl.grid(row=4, column=0, columnspan=2) labels.append(lbl) # index 16 lbl = tk.Label(root, text="Delta Tick:0", width=20, height=3, borderwidth=2, relief="groove", font=("Arial", 14)) lbl.grid(row=4, column=2, columnspan=2) labels.append(lbl) # index 17 # Checkboxes to select plotted (derived) channels check_frame = ttk.LabelFrame(root, text="Plot Selection") check_frame.grid(row=1, column=0, padx=10, pady=(0, 10), sticky="nw") check_vars = [] for i, ch_label in enumerate(DERIVED_CHANNEL_LABELS): var = tk.IntVar(value=0) chk = ttk.Checkbutton( check_frame, text=ch_label, variable=var, command=update_plot if 'update_plot' in globals() else None # placeholder; will be updated below ) chk.grid(row=i // 4, column=i % 4, sticky="w", padx=4, pady=2) check_vars.append(var) # Right: Matplotlib plot plot_frame = ttk.Frame(root) plot_frame.grid(row=0, column=1, rowspan=2, padx=10, pady=10, sticky="n") fig = Figure(figsize=(6, 4), dpi=100) ax = fig.add_subplot(111) ax.set_title("Selected Channels") ax.set_xlabel("Timestamp") ax.set_ylabel("Value") canvas = FigureCanvasTkAgg(fig, master=plot_frame) canvas.get_tk_widget().pack(fill="both", expand=True) # Optional controls controls_frame = ttk.Frame(plot_frame) controls_frame.pack(fill="x", pady=5) def clear_selection(): for v in check_vars: v.set(0) update_plot() def clear_history(): """Clear stored history for all plotted channels.""" history_x.clear() for dq in history_y: dq.clear() update_plot() # refresh plot so it shows 'No data' message ttk.Button(controls_frame, text="Clear Selection", command=clear_selection).pack(side="left") ttk.Button(controls_frame, text="Clear History", command=clear_history).pack(side="left") # === Update functions === Last_tick: int = 0 def update_grid(values: list, tick: int): """Update the 4x4 grid with raw ADC values and write ADC csv.""" global Last_tick # NOTE: mode should be "a" or "a+", not "+a" with open(f"{FILENAME}_ADC.csv", "a") as file: file.write(f"{tick}") for i in range(16): # 16 raw ADC channels file.write(f",{values[i]}") labels[i].config(text=f"{values[i]:n}") ms_total = tick hours, rem = divmod(ms_total, 3_600_000) # 1000*60*60 minutes, rem = divmod(rem, 60_000) # 1000*60 seconds, milliseconds = divmod(rem, 1000) labels[16].config( text=f"UP Time:{hours:02d}:{minutes:02d}:{seconds:02d}.{milliseconds:03d}") labels[17].config(text=f"Delta Tick:{(tick - Last_tick):n}") Last_tick = tick file.write("\n") def update_SI(value: SD.ReturnDecoder): # NOTE: mode should be "a" or "a+", not "+a" with open(f"{FILENAME}_SI.csv", "a") as file: file.write(f"{value.timestamp},{value.log_message}\n") print(f"\033[F\033[2Ktime:{value.timestamp}, SI:{value.log_message}", end="\n") Last_history_call: int = 0 def append_history(adc_values: list, timestamp: int): """Store the new sample in the rolling history (derived units).""" global Last_history_call if len(history_x) > 0 and timestamp < history_x[-1]: return # discard out-of-order samples if len(history_x) > 0 and (history_x[-1] + (int(time.time_ns() / 1000000) - Last_history_call)*2) < timestamp: print(f"Discarding future sample: {timestamp} > {history_x[-1]} + {(int(time.time_ns() / 1000000) - Last_history_call)*2}") return # discard samples too far in future history_x.append(timestamp) Last_history_call = int(time.time_ns() / 1000000) # in ms # 0: Thrust (average of load cells 0 and 1) lc_avg = (adc_values[0] + adc_values[1]) / 2 weight = -1.166759308000000e-04 * lc_avg + 4.971416323340051e+02 history_y[0].append(weight) # 1-6: Temperatures from channels 2..7 for i in range(2, 8): # six thermocouples temp = 0.000111 * adc_values[i] + 2.31991 history_y[i - 1].append(temp) # 2→1, 7→6 # 7-14: Pressures from channels 8..15 for i in range(8, 16): # eight pressure sensors pres = 0.0000153522 * adc_values[i] - 6.5652036917 history_y[i - 1].append(pres) # 8→7, 15→14 def update_plot(): """Redraw plot based on selected derived channels and available history.""" ax.clear() ax.set_title("Selected Channels") ax.set_xlabel("Timestamp") ax.set_ylabel("Value") selected = [i for i, var in enumerate(check_vars) if var.get() == 1] if len(history_x) == 0 or len(selected) == 0: ax.text( 0.5, 0.5, "No data / No channels selected", ha="center", va="center", transform=ax.transAxes ) canvas.draw() return x = list(history_x) for idx in selected: # safety: guard against any mismatch, just in case if idx < len(history_y): y = list(history_y[idx]) ax.plot(x, y, label=DERIVED_CHANNEL_LABELS[idx]) ax.legend(loc="upper left", fontsize=8) ax.grid(True, linestyle="--", alpha=0.3) canvas.draw() # Now that update_plot exists, fix the checkbox command to point to it for chk_var, child in zip(check_vars, check_frame.winfo_children()): if isinstance(child, ttk.Checkbutton): child.config(command=update_plot) def update_panel(): """Main polling loop that consumes Serial_decoder outputs.""" values: SD.ReturnDecoder = SD.GetReturn(0.1) # non-blocking read with timeout if values is None: root.after(10, update_panel) return # Stop if input thread died if tasks[1].is_alive() is False: root.quit() match values.name: case "ADC_FULLRANK": # Update grid + history + plot update_grid(values=values.ADC_data, tick=values.timestamp) append_history(values.ADC_data, values.timestamp) update_plot() # draw right away; if too heavy, throttle with a timer case "SI_DECODER": update_SI(values) case "PR_DECODER": print(f"PRD:{values.text_respons}") case _: print(f"Error {values.name}") # Continue polling root.after(10, update_panel) # 10 ms # Start polling and GUI loop update_panel() # FC commands SD.SerialPort.write(b"SET_ST_SPEED 100\n\r") root.mainloop()