From 2db940988fc42976255c659627fb0d04ee1d0ae0 Mon Sep 17 00:00:00 2001 From: Metacube Date: Sun, 5 Apr 2026 18:04:13 +0200 Subject: [PATCH] Update print statement from 'Hello' to 'Goodbye' --- vu1_gui.py | 1533 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1533 insertions(+) create mode 100644 vu1_gui.py diff --git a/vu1_gui.py b/vu1_gui.py new file mode 100644 index 0000000..f363485 --- /dev/null +++ b/vu1_gui.py @@ -0,0 +1,1533 @@ +#!/usr/bin/env python3 +""" +VU1 Meter Web GUI v3 +==================== +Skeuomorphes VU-Meter mit echter Galvanometer-Physik, +Audio-Passthrough und Physik-Visualisierung. +Öffne http://localhost:8080 im Browser. +""" + +import requests, time, argparse, numpy as np, sys, math +import threading, json, psutil +from flask import Flask, render_template_string, jsonify, request as flask_request + +try: + import sounddevice as sd +except ImportError: + print("❌ sounddevice nicht installiert!") + sys.exit(1) + +app = Flask(__name__) + +# ── Globals ─────────────────────────────────────────────────── +meter = None +client = None +dial_uid = None +cpu_dial_uid = None +disk_dial_uid = None +running = False +current_level = 0 +current_peak = 0 + +# ══════════════════════════════════════════════════════════════ +# HTML / CSS / JS — Skeuomorphes VU-Meter +# ══════════════════════════════════════════════════════════════ + +HTML_PAGE = r""" + + + + + +VU1 Meter + + + +
+ + +
+
+
+
+
+
+ +
+
+ + +
+
0%
+
+ pk + 0% +
+
— dB
+
+ + +
+ + +
+
Audio
+
+ + +
+
+ + +
+ +
+ + + + +
+ +
Modus
+
+ + + +
+ + +
+ + + + + + +
+ + + + + + + + +
+
Physik
+
+ + + 0.80 +
+
+ + + 1.20 +
+
+ + + 3.00 +
+
+ + + 0.00 +
+
+ + + -40 +
+
+ + +
+ +
+ + +
+ + +
+
bereit
+ + +
+
Weitere Dials
+
+
+
+
+ + + + +""" + + +# ══════════════════════════════════════════════════════════════ +# VU1 Hardware Client +# ══════════════════════════════════════════════════════════════ + +class VU1Client: + def __init__(self, host="localhost", port=5340, api_key=""): + self.base_url = f"http://{host}:{port}" + self.api_key = api_key + + def get_dials(self): + try: + url = f"{self.base_url}/api/v0/dial/list" + params = {"key": self.api_key} if self.api_key else {} + r = requests.get(url, params=params, timeout=5) + data = r.json() + if data.get("status") == "ok": + return data.get("data", []) + except Exception as e: + print(f"Dial-Fehler: {e}") + return [] + + def set_dial_value(self, uid, value): + value = max(0, min(100, int(value))) + try: + url = f"{self.base_url}/api/v0/dial/{uid}/set" + params = {"value": value} + if self.api_key: + params["key"] = self.api_key + requests.get(url, params=params, timeout=0.5) + except Exception: + pass + + +# ══════════════════════════════════════════════════════════════ +# PhysicsVU — Audio Engine + Meter +# ══════════════════════════════════════════════════════════════ + +class PhysicsVU: + def __init__(self, device_in=None, device_out=None): + self.device_in = device_in + self.device_out = device_out + self.stream = None + self.sample_rate = 44100 + + # Physik + self.mass = 0.8 + self.damping = 1.2 + self.spring = 3.0 + self.gravity = 0.0 + self.sensitivity = -40 + + # Nadel-State + self.needle_pos = 0.0 + self.needle_vel = 0.0 + self.peak_pos = 0.0 + self.peak_hold_t = 0.0 + self._last_time = None + + # Physik-Visualisierung: Kräfte für Frontend + self._f_spring = 0.0 + self._f_damping = 0.0 + self._f_gravity = 0.0 + self._target = 0.0 + + # Audio + self._latest_rms = 0.0 + self._latest_peak = 0.0 + + # Ringbuffer 15ms + self._ring_size = int(self.sample_rate * 15 / 1000) + self._ring = np.zeros(self._ring_size, dtype=np.float32) + self._ring_idx = 0 + + # Mode + self.mode = 'full' + + # Monitor / Bypass / Solo + self.monitor = False + self.bypass = False + self.solo = 'off' # 'off','lo','hi' + + # Dual-Band + self.band_crossover = 250.0 + self.band_lo_weight = 0.6 + self.band_hi_weight = 0.4 + self._bq_lo_z = np.zeros(2) + self._bq_hi_z = np.zeros(2) + self._bq_lo, self._bq_hi = self._calc_biquad(self.band_crossover) + self._ring_lo = np.zeros(self._ring_size, dtype=np.float32) + self._ring_hi = np.zeros(self._ring_size, dtype=np.float32) + self._latest_rms_lo = 0.0 + self._latest_rms_hi = 0.0 + + # IEC True + self._iec_coeffs = self._calc_biquad_lpf(2.224, 0.6053) + self._iec_z = np.zeros(2) + self._iec_level = 0.0 + + # ── Biquad helpers ── + def _calc_biquad(self, fc): + w0 = 2.0 * math.pi * fc / self.sample_rate + alpha = math.sin(w0) / (2.0 * 0.7071) + cos_w0 = math.cos(w0) + b0_lp = (1.0 - cos_w0) / 2.0 + b1_lp = 1.0 - cos_w0 + a0 = 1.0 + alpha + a1 = -2.0 * cos_w0 + a2 = 1.0 - alpha + lp = np.array([b0_lp/a0, b1_lp/a0, b0_lp/a0, a1/a0, a2/a0], dtype=np.float64) + b0_hp = (1.0 + cos_w0) / 2.0 + b1_hp = -(1.0 + cos_w0) + hp = np.array([b0_hp/a0, b1_hp/a0, b0_hp/a0, a1/a0, a2/a0], dtype=np.float64) + return lp, hp + + def _calc_biquad_lpf(self, fc, Q): + w0 = 2.0 * math.pi * fc / self.sample_rate + alpha = math.sin(w0) / (2.0 * Q) + cos_w0 = math.cos(w0) + b0 = (1.0 - cos_w0) / 2.0 + a0 = 1.0 + alpha + a1 = -2.0 * cos_w0 + a2 = 1.0 - alpha + return np.array([b0/a0, (1.0-cos_w0)/a0, b0/a0, a1/a0, a2/a0], dtype=np.float64) + + @staticmethod + def _biquad_process(coeffs, z, x): + b0, b1, b2, a1, a2 = coeffs + out = np.empty_like(x, dtype=np.float64) + z1, z2 = z[0], z[1] + for i in range(len(x)): + xi = float(x[i]) + yi = b0*xi + z1 + z1 = b1*xi - a1*yi + z2 + z2 = b2*xi - a2*yi + out[i] = yi + z[0], z[1] = z1, z2 + return out + + # ── Audio Callback (kombinierter I/O Stream) ── + def _callback(self, indata, outdata, frames, time_info, status): + mono = indata[:, 0] if indata.ndim > 1 else indata.flatten() + + # Peak + self._latest_peak = float(np.max(np.abs(mono))) + + # Ringbuffer + n = len(mono) + end = self._ring_idx + n + if end <= self._ring_size: + self._ring[self._ring_idx:end] = mono + else: + first = self._ring_size - self._ring_idx + self._ring[self._ring_idx:] = mono[:first] + self._ring[:n - first] = mono[first:] + + # Dual-Band Filter + lo_block = None + hi_block = None + if self.mode == 'dualband' or self.solo != 'off': + lo_block = self._biquad_process(self._bq_lo, self._bq_lo_z, mono.astype(np.float64)) + hi_block = self._biquad_process(self._bq_hi, self._bq_hi_z, mono.astype(np.float64)) + if end <= self._ring_size: + self._ring_lo[self._ring_idx:end] = lo_block + self._ring_hi[self._ring_idx:end] = hi_block + else: + first = self._ring_size - self._ring_idx + self._ring_lo[self._ring_idx:] = lo_block[:first] + self._ring_lo[:n-first] = lo_block[first:] + self._ring_hi[self._ring_idx:] = hi_block[:first] + self._ring_hi[:n-first] = hi_block[first:] + self._latest_rms_lo = float(np.sqrt(np.mean(self._ring_lo**2))) + self._latest_rms_hi = float(np.sqrt(np.mean(self._ring_hi**2))) + + self._ring_idx = (self._ring_idx + n) % self._ring_size + self._latest_rms = float(np.sqrt(np.mean(self._ring**2))) + + # IEC True + if self.mode == 'iec_true': + rect = np.abs(mono).astype(np.float64) + filt = self._biquad_process(self._iec_coeffs, self._iec_z, rect) + self._iec_level = float(filt[-1]) + + # ── Audio Output ── + # Monitor: Signal hören (mit Filter/Solo je nach Mode) + # Bypass: Signal hören (raw Stereo, ungefiltert) + # Keins: Stille + if outdata is not None: + out_ch = outdata.shape[1] + in_ch = indata.shape[1] if indata.ndim > 1 else 1 + n_frames = min(outdata.shape[0], indata.shape[0]) + + def _stereo_copy(): + """Originales Stereo-Signal direkt durchreichen.""" + if in_ch >= out_ch: + outdata[:n_frames] = indata[:n_frames, :out_ch] + else: + for ch in range(out_ch): + outdata[:n_frames, ch] = indata[:n_frames, min(ch, in_ch-1)] + + if self.bypass: + # Bypass: immer raw Stereo, egal ob Monitor an/aus + _stereo_copy() + + elif self.monitor: + # Monitor: gefiltertes Signal je nach Solo-Modus + if self.solo == 'lo' and lo_block is not None: + solo_f32 = lo_block.astype(np.float32) + for ch in range(out_ch): + outdata[:, ch] = solo_f32[:n_frames] + elif self.solo == 'hi' and hi_block is not None: + solo_f32 = hi_block.astype(np.float32) + for ch in range(out_ch): + outdata[:, ch] = solo_f32[:n_frames] + else: + # Monitor ohne Solo: Stereo durchreichen + _stereo_copy() + + else: + # Weder Monitor noch Bypass: Stille + outdata[:] = 0 + + # ── Stream Control ── + def start(self): + try: + in_info = sd.query_devices(self.device_in) + in_ch = min(2, in_info['max_input_channels']) + if in_ch == 0: + print("❌ Kein Input-Kanal!") + return False + + out_ch = 0 + if self.device_out is not None and self.device_out >= 0: + out_info = sd.query_devices(self.device_out) + out_ch = min(2, out_info['max_output_channels']) + + if out_ch > 0: + self.stream = sd.Stream( + device=(self.device_in, self.device_out), + channels=(in_ch, out_ch), + callback=self._callback, + blocksize=128, + samplerate=self.sample_rate + ) + else: + # Input-only — wrap callback to match signature + def input_only_cb(indata, frames, time_info, status): + self._callback(indata, None, frames, time_info, status) + self.stream = sd.InputStream( + device=self.device_in, + channels=in_ch, + callback=input_only_cb, + blocksize=128, + samplerate=self.sample_rate + ) + + self.stream.start() + self._last_time = time.perf_counter() + print("✅ Audio gestartet") + return True + except Exception as e: + print(f"Audio-Fehler: {e}") + return False + + def stop(self): + if self.stream: + self.stream.stop() + self.stream.close() + self.stream = None + + # ── dB Mapping ── + def _rms_to_percent(self, rms): + if rms < 1e-7: + return 0.0 + db = 20.0 * math.log10(rms) + db_clamped = max(self.sensitivity, min(0.0, db)) + linear = (db_clamped - self.sensitivity) / (-self.sensitivity) + return (linear ** 1.3) * 100.0 + + # ── get_level ── + def get_level(self): + now = time.perf_counter() + if self._last_time is None: + self._last_time = now + dt = now - self._last_time + self._last_time = now + dt = min(dt, 0.05) + + # IEC True + if self.mode == 'iec_true': + corrected = self._iec_level * (math.pi / 2.0) + self.needle_pos = self._rms_to_percent(corrected) + self.needle_pos = max(0.0, min(100.0, self.needle_pos)) + self._target = self.needle_pos + self._f_spring = 0; self._f_damping = 0; self._f_gravity = 0 + + peak_t = self._rms_to_percent(self._latest_peak) + if peak_t > self.peak_pos: + self.peak_pos = peak_t; self.peak_hold_t = 1.5 + else: + if self.peak_hold_t > 0: self.peak_hold_t -= dt + else: self.peak_pos -= 20.0 * dt + self.peak_pos = max(0.0, min(100.0, self.peak_pos)) + return self.needle_pos, self.peak_pos + + # Target + if self.mode == 'dualband': + target = min(100.0, self._rms_to_percent(self._latest_rms_lo)*self.band_lo_weight + + self._rms_to_percent(self._latest_rms_hi)*self.band_hi_weight) + else: + target = self._rms_to_percent(self._latest_rms) + self._target = target + + # Spring-mass-damper + self._f_spring = self.spring * (target - self.needle_pos) + self._f_damping = -self.damping * self.needle_vel + self._f_gravity = -self.gravity + + f_total = self._f_spring + self._f_damping + self._f_gravity + acc = f_total / max(0.01, self.mass) + self.needle_vel += acc * dt + self.needle_pos += self.needle_vel * dt + + if self.needle_pos < 0: + self.needle_pos = 0; self.needle_vel = max(0, self.needle_vel) + elif self.needle_pos > 100: + self.needle_pos = 100; self.needle_vel = min(0, self.needle_vel) + + peak_t = self._rms_to_percent(self._latest_peak) + if peak_t > self.peak_pos: + self.peak_pos = peak_t; self.peak_hold_t = 1.5 + else: + if self.peak_hold_t > 0: self.peak_hold_t -= dt + else: self.peak_pos -= 20.0 * dt + self.peak_pos = max(0.0, min(100.0, self.peak_pos)) + + return self.needle_pos, self.peak_pos + + +# ══════════════════════════════════════════════════════════════ +# Flask Routes +# ══════════════════════════════════════════════════════════════ + +@app.route('/') +def index(): + return render_template_string(HTML_PAGE) + +@app.route('/devices') +def list_devices(): + inputs, outputs = [], [] + for i, d in enumerate(sd.query_devices()): + if d['max_input_channels'] > 0: + inputs.append({"id": i, "name": d['name']}) + if d['max_output_channels'] > 0: + outputs.append({"id": i, "name": d['name']}) + return jsonify({"inputs": inputs, "outputs": outputs}) + +@app.route('/current_device') +def current_device(): + global meter + in_id = meter.device_in if meter else app.config.get('audio_device_in', 0) + out_id = meter.device_out if meter else app.config.get('audio_device_out', -1) + return jsonify({"input": in_id, "output": out_id if out_id is not None else -1}) + +@app.route('/set_device//') +def set_device(which, dev_id): + global meter, running + if which == 'in': + if running: + return jsonify({"ok": False, "error": "Zuerst stoppen"}) + if meter: + meter.stop() + meter.device_in = dev_id + app.config['audio_device_in'] = dev_id + elif which == 'out': + if meter: + was_running = running + if running: + running = False + meter.stop() + meter.device_out = dev_id if dev_id >= 0 else None + app.config['audio_device_out'] = dev_id + if was_running: + if meter.start(): + running = True + threading.Thread(target=update_loop, daemon=True).start() + return jsonify({"ok": True}) + +@app.route('/toggle') +def toggle(): + global running, meter, current_level + if running: + running = False + if meter: meter.stop() + return jsonify({"running": False}) + else: + if not meter: + meter = PhysicsVU( + device_in=app.config.get('audio_device_in'), + device_out=app.config.get('audio_device_out') + ) + if not meter.stream: + if not meter.start(): + return jsonify({"running": False, "error": "Audio failed"}) + running = True + threading.Thread(target=update_loop, daemon=True).start() + return jsonify({"running": True}) + +@app.route('/level') +def get_level_route(): + global current_level, current_peak, meter + rms = meter._latest_rms if meter else 0.0 + db = round(20 * math.log10(rms), 1) if rms > 1e-7 else -120 + resp = {"level": current_level, "peak": current_peak, "db": db} + if meter: + resp["phys"] = { + "pos": round(meter.needle_pos, 2), + "vel": round(meter.needle_vel, 2), + "target": round(meter._target, 2), + "f_spring": round(meter._f_spring, 2), + "f_damping": round(meter._f_damping, 2), + "f_gravity": round(meter._f_gravity, 2) + } + if meter.mode == 'dualband': + rlo = meter._latest_rms_lo + rhi = meter._latest_rms_hi + resp["db_lo"] = round(20*math.log10(rlo), 1) if rlo > 1e-7 else -120 + resp["db_hi"] = round(20*math.log10(rhi), 1) if rhi > 1e-7 else -120 + return jsonify(resp) + +@app.route('/extra_status') +def extra_status(): + result = [] + cpu = int(psutil.cpu_percent(interval=None)) + result.append({"label":"CPU","found":bool(cpu_dial_uid),"value":cpu if cpu_dial_uid else 0, + "uid_short":cpu_dial_uid[:8]+"…" if cpu_dial_uid else "—", + "status":"aktiv" if cpu_dial_uid else "kein Dial"}) + disk = int(psutil.disk_usage('/').percent) + result.append({"label":"Disk /","found":bool(disk_dial_uid),"value":disk if disk_dial_uid else 0, + "uid_short":disk_dial_uid[:8]+"…" if disk_dial_uid else "—", + "status":"aktiv" if disk_dial_uid else "kein Dial"}) + return jsonify(result) + +@app.route('/set//') +def set_param(param, value): + global meter + if not meter: return jsonify({"ok": False}) + if param == 'mode': + meter.mode = value + if value == 'iec_true': meter._iec_z[:] = 0 + elif param == 'monitor': + meter.monitor = value == '1' + elif param == 'bypass': + meter.bypass = value == '1' + elif param == 'solo': + meter.solo = value + elif param == 'crossover': + meter.band_crossover = float(value) + meter._bq_lo, meter._bq_hi = meter._calc_biquad(float(value)) + meter._bq_lo_z[:] = 0; meter._bq_hi_z[:] = 0 + elif param == 'lo_weight': + meter.band_lo_weight = float(value) + elif param == 'hi_weight': + meter.band_hi_weight = float(value) + else: + v = float(value) + if param == 'mass': meter.mass = v + elif param == 'damping': meter.damping = v + elif param == 'spring': meter.spring = v + elif param == 'gravity': meter.gravity = v + elif param == 'sensitivity': meter.sensitivity = v + return jsonify({"ok": True}) + +@app.route('/reset') +def reset(): + global meter + if meter: + meter.mass=0.8; meter.damping=1.2; meter.spring=3.0 + meter.gravity=0.0; meter.sensitivity=-40 + meter.needle_pos=0; meter.needle_vel=0; meter.peak_pos=0 + meter.mode='full'; meter.monitor=False; meter.bypass=False; meter.solo='off' + meter.band_crossover=250; meter.band_lo_weight=0.6; meter.band_hi_weight=0.4 + meter._iec_z[:]=0; meter._iec_level=0 + return jsonify({"ok": True}) + + +# ── Helpers ── + +def value_to_backlight(pct): + if pct < 60: return {"red":0,"green":100,"blue":0} + elif pct < 80: return {"red":100,"green":60,"blue":0} + else: return {"red":100,"green":0,"blue":0} + +def send_metric(uid, value): + try: + client.set_dial_value(uid, value) + bl = value_to_backlight(value) + url = f"{client.base_url}/api/v0/dial/{uid}/backlight" + requests.get(url, params={**bl, "key": client.api_key}, timeout=0.5) + except Exception: pass + +def update_loop(): + global running, current_level, current_peak, meter, client + global dial_uid, cpu_dial_uid, disk_dial_uid + psutil.cpu_percent(interval=None) + while running: + if meter: + current_level, current_peak = meter.get_level() + if client and dial_uid: + threading.Thread(target=send_metric, args=(dial_uid, current_level), daemon=True).start() + if client and cpu_dial_uid: + cpu = int(psutil.cpu_percent(interval=None)) + threading.Thread(target=send_metric, args=(cpu_dial_uid, cpu), daemon=True).start() + if client and disk_dial_uid: + disk = int(psutil.disk_usage('/').percent) + threading.Thread(target=send_metric, args=(disk_dial_uid, disk), daemon=True).start() + time.sleep(0.015) + + +# ── Main ── + +def main(): + global client, dial_uid, meter + + parser = argparse.ArgumentParser() + parser.add_argument("--api-key", required=True) + parser.add_argument("--audio-device", type=int, default=None) + parser.add_argument("--output-device", type=int, default=None) + parser.add_argument("--dial-name", default="CPU") + parser.add_argument("--port", type=int, default=8080) + args = parser.parse_args() + + app.config['audio_device_in'] = args.audio_device + app.config['audio_device_out'] = args.output_device + + client = VU1Client(api_key=args.api_key) + dials = client.get_dials() + + found_uid = None + for d in dials: + if d['dial_name'].upper() == args.dial_name.upper(): + found_uid = d['uid'] + print(f"✅ Audio-Dial: {d['dial_name']} ({d['uid'][:8]}…)") + break + if not found_uid: + print(f"❌ Dial '{args.dial_name}' nicht gefunden!") + sys.exit(1) + globals()['dial_uid'] = found_uid + + others = [d['uid'] for d in dials if d['uid'] != found_uid] + if len(others) >= 1: + globals()['cpu_dial_uid'] = others[0] + print(f"✅ CPU-Dial: ({others[0][:8]}…)") + if len(others) >= 2: + globals()['disk_dial_uid'] = others[1] + print(f"✅ Disk-Dial: ({others[1][:8]}…)") + + meter = PhysicsVU(device_in=args.audio_device, device_out=args.output_device) + + print(f"\n🎛️ VU1 Meter Web GUI v3") + print(f"{'='*40}") + print(f"http://localhost:{args.port}") + print(f"{'='*40}") + + app.run(host='0.0.0.0', port=args.port, debug=False, threaded=True) + + +if __name__ == "__main__": + main()