#!/usr/bin/env python3 """ VU1 Meter Web GUI v3 ==================== Skeuomorphes VU-Meter mit echter Galvanometer-Physik, Audio-Passthrough und Physik-Visualisierung. Öffne http://localhost:8080 im Browser. """ import requests, time, argparse, numpy as np, sys, math import threading, json, psutil from flask import Flask, render_template_string, jsonify, request as flask_request try: import sounddevice as sd except ImportError: print("❌ sounddevice nicht installiert!") sys.exit(1) app = Flask(__name__) # ── Globals ─────────────────────────────────────────────────── meter = None client = None dial_uid = None cpu_dial_uid = None disk_dial_uid = None running = False current_level = 0 current_peak = 0 # ══════════════════════════════════════════════════════════════ # HTML / CSS / JS — Skeuomorphes VU-Meter # ══════════════════════════════════════════════════════════════ HTML_PAGE = r""" VU1 Meter
0%
pk 0%
— dB
Audio
Modus
Physik
0.80
1.20
3.00
0.00
-40
bereit
Weitere Dials
""" # ══════════════════════════════════════════════════════════════ # VU1 Hardware Client # ══════════════════════════════════════════════════════════════ class VU1Client: def __init__(self, host="localhost", port=5340, api_key=""): self.base_url = f"http://{host}:{port}" self.api_key = api_key def get_dials(self): try: url = f"{self.base_url}/api/v0/dial/list" params = {"key": self.api_key} if self.api_key else {} r = requests.get(url, params=params, timeout=5) data = r.json() if data.get("status") == "ok": return data.get("data", []) except Exception as e: print(f"Dial-Fehler: {e}") return [] def set_dial_value(self, uid, value): value = max(0, min(100, int(value))) try: url = f"{self.base_url}/api/v0/dial/{uid}/set" params = {"value": value} if self.api_key: params["key"] = self.api_key requests.get(url, params=params, timeout=0.5) except Exception: pass # ══════════════════════════════════════════════════════════════ # PhysicsVU — Audio Engine + Meter # ══════════════════════════════════════════════════════════════ class PhysicsVU: def __init__(self, device_in=None, device_out=None): self.device_in = device_in self.device_out = device_out self.stream = None self.sample_rate = 44100 # Physik self.mass = 0.8 self.damping = 1.2 self.spring = 3.0 self.gravity = 0.0 self.sensitivity = -40 # Nadel-State self.needle_pos = 0.0 self.needle_vel = 0.0 self.peak_pos = 0.0 self.peak_hold_t = 0.0 self._last_time = None # Physik-Visualisierung: Kräfte für Frontend self._f_spring = 0.0 self._f_damping = 0.0 self._f_gravity = 0.0 self._target = 0.0 # Audio self._latest_rms = 0.0 self._latest_peak = 0.0 # Ringbuffer 15ms self._ring_size = int(self.sample_rate * 15 / 1000) self._ring = np.zeros(self._ring_size, dtype=np.float32) self._ring_idx = 0 # Mode self.mode = 'full' # Monitor / Bypass / Solo self.monitor = False self.bypass = False self.solo = 'off' # 'off','lo','hi' # Dual-Band self.band_crossover = 250.0 self.band_lo_weight = 0.6 self.band_hi_weight = 0.4 self._bq_lo_z = np.zeros(2) self._bq_hi_z = np.zeros(2) self._bq_lo, self._bq_hi = self._calc_biquad(self.band_crossover) self._ring_lo = np.zeros(self._ring_size, dtype=np.float32) self._ring_hi = np.zeros(self._ring_size, dtype=np.float32) self._latest_rms_lo = 0.0 self._latest_rms_hi = 0.0 # IEC True self._iec_coeffs = self._calc_biquad_lpf(2.224, 0.6053) self._iec_z = np.zeros(2) self._iec_level = 0.0 # ── Biquad helpers ── def _calc_biquad(self, fc): w0 = 2.0 * math.pi * fc / self.sample_rate alpha = math.sin(w0) / (2.0 * 0.7071) cos_w0 = math.cos(w0) b0_lp = (1.0 - cos_w0) / 2.0 b1_lp = 1.0 - cos_w0 a0 = 1.0 + alpha a1 = -2.0 * cos_w0 a2 = 1.0 - alpha lp = np.array([b0_lp/a0, b1_lp/a0, b0_lp/a0, a1/a0, a2/a0], dtype=np.float64) b0_hp = (1.0 + cos_w0) / 2.0 b1_hp = -(1.0 + cos_w0) hp = np.array([b0_hp/a0, b1_hp/a0, b0_hp/a0, a1/a0, a2/a0], dtype=np.float64) return lp, hp def _calc_biquad_lpf(self, fc, Q): w0 = 2.0 * math.pi * fc / self.sample_rate alpha = math.sin(w0) / (2.0 * Q) cos_w0 = math.cos(w0) b0 = (1.0 - cos_w0) / 2.0 a0 = 1.0 + alpha a1 = -2.0 * cos_w0 a2 = 1.0 - alpha return np.array([b0/a0, (1.0-cos_w0)/a0, b0/a0, a1/a0, a2/a0], dtype=np.float64) @staticmethod def _biquad_process(coeffs, z, x): b0, b1, b2, a1, a2 = coeffs out = np.empty_like(x, dtype=np.float64) z1, z2 = z[0], z[1] for i in range(len(x)): xi = float(x[i]) yi = b0*xi + z1 z1 = b1*xi - a1*yi + z2 z2 = b2*xi - a2*yi out[i] = yi z[0], z[1] = z1, z2 return out # ── Audio Callback (kombinierter I/O Stream) ── def _callback(self, indata, outdata, frames, time_info, status): mono = indata[:, 0] if indata.ndim > 1 else indata.flatten() # Peak self._latest_peak = float(np.max(np.abs(mono))) # Ringbuffer n = len(mono) end = self._ring_idx + n if end <= self._ring_size: self._ring[self._ring_idx:end] = mono else: first = self._ring_size - self._ring_idx self._ring[self._ring_idx:] = mono[:first] self._ring[:n - first] = mono[first:] # Dual-Band Filter lo_block = None hi_block = None if self.mode == 'dualband' or self.solo != 'off': lo_block = self._biquad_process(self._bq_lo, self._bq_lo_z, mono.astype(np.float64)) hi_block = self._biquad_process(self._bq_hi, self._bq_hi_z, mono.astype(np.float64)) if end <= self._ring_size: self._ring_lo[self._ring_idx:end] = lo_block self._ring_hi[self._ring_idx:end] = hi_block else: first = self._ring_size - self._ring_idx self._ring_lo[self._ring_idx:] = lo_block[:first] self._ring_lo[:n-first] = lo_block[first:] self._ring_hi[self._ring_idx:] = hi_block[:first] self._ring_hi[:n-first] = hi_block[first:] self._latest_rms_lo = float(np.sqrt(np.mean(self._ring_lo**2))) self._latest_rms_hi = float(np.sqrt(np.mean(self._ring_hi**2))) self._ring_idx = (self._ring_idx + n) % self._ring_size self._latest_rms = float(np.sqrt(np.mean(self._ring**2))) # IEC True if self.mode == 'iec_true': rect = np.abs(mono).astype(np.float64) filt = self._biquad_process(self._iec_coeffs, self._iec_z, rect) self._iec_level = float(filt[-1]) # ── Audio Output ── # Monitor: Signal hören (mit Filter/Solo je nach Mode) # Bypass: Signal hören (raw Stereo, ungefiltert) # Keins: Stille if outdata is not None: out_ch = outdata.shape[1] in_ch = indata.shape[1] if indata.ndim > 1 else 1 n_frames = min(outdata.shape[0], indata.shape[0]) def _stereo_copy(): """Originales Stereo-Signal direkt durchreichen.""" if in_ch >= out_ch: outdata[:n_frames] = indata[:n_frames, :out_ch] else: for ch in range(out_ch): outdata[:n_frames, ch] = indata[:n_frames, min(ch, in_ch-1)] if self.bypass: # Bypass: immer raw Stereo, egal ob Monitor an/aus _stereo_copy() elif self.monitor: # Monitor: gefiltertes Signal je nach Solo-Modus if self.solo == 'lo' and lo_block is not None: solo_f32 = lo_block.astype(np.float32) for ch in range(out_ch): outdata[:, ch] = solo_f32[:n_frames] elif self.solo == 'hi' and hi_block is not None: solo_f32 = hi_block.astype(np.float32) for ch in range(out_ch): outdata[:, ch] = solo_f32[:n_frames] else: # Monitor ohne Solo: Stereo durchreichen _stereo_copy() else: # Weder Monitor noch Bypass: Stille outdata[:] = 0 # ── Stream Control ── def start(self): try: in_info = sd.query_devices(self.device_in) in_ch = min(2, in_info['max_input_channels']) if in_ch == 0: print("❌ Kein Input-Kanal!") return False out_ch = 0 if self.device_out is not None and self.device_out >= 0: out_info = sd.query_devices(self.device_out) out_ch = min(2, out_info['max_output_channels']) if out_ch > 0: self.stream = sd.Stream( device=(self.device_in, self.device_out), channels=(in_ch, out_ch), callback=self._callback, blocksize=128, samplerate=self.sample_rate ) else: # Input-only — wrap callback to match signature def input_only_cb(indata, frames, time_info, status): self._callback(indata, None, frames, time_info, status) self.stream = sd.InputStream( device=self.device_in, channels=in_ch, callback=input_only_cb, blocksize=128, samplerate=self.sample_rate ) self.stream.start() self._last_time = time.perf_counter() print("✅ Audio gestartet") return True except Exception as e: print(f"Audio-Fehler: {e}") return False def stop(self): if self.stream: self.stream.stop() self.stream.close() self.stream = None # ── dB Mapping ── def _rms_to_percent(self, rms): if rms < 1e-7: return 0.0 db = 20.0 * math.log10(rms) db_clamped = max(self.sensitivity, min(0.0, db)) linear = (db_clamped - self.sensitivity) / (-self.sensitivity) return (linear ** 1.3) * 100.0 # ── get_level ── def get_level(self): now = time.perf_counter() if self._last_time is None: self._last_time = now dt = now - self._last_time self._last_time = now dt = min(dt, 0.05) # IEC True if self.mode == 'iec_true': corrected = self._iec_level * (math.pi / 2.0) self.needle_pos = self._rms_to_percent(corrected) self.needle_pos = max(0.0, min(100.0, self.needle_pos)) self._target = self.needle_pos self._f_spring = 0; self._f_damping = 0; self._f_gravity = 0 peak_t = self._rms_to_percent(self._latest_peak) if peak_t > self.peak_pos: self.peak_pos = peak_t; self.peak_hold_t = 1.5 else: if self.peak_hold_t > 0: self.peak_hold_t -= dt else: self.peak_pos -= 20.0 * dt self.peak_pos = max(0.0, min(100.0, self.peak_pos)) return self.needle_pos, self.peak_pos # Target if self.mode == 'dualband': target = min(100.0, self._rms_to_percent(self._latest_rms_lo)*self.band_lo_weight + self._rms_to_percent(self._latest_rms_hi)*self.band_hi_weight) else: target = self._rms_to_percent(self._latest_rms) self._target = target # Spring-mass-damper self._f_spring = self.spring * (target - self.needle_pos) self._f_damping = -self.damping * self.needle_vel self._f_gravity = -self.gravity f_total = self._f_spring + self._f_damping + self._f_gravity acc = f_total / max(0.01, self.mass) self.needle_vel += acc * dt self.needle_pos += self.needle_vel * dt if self.needle_pos < 0: self.needle_pos = 0; self.needle_vel = max(0, self.needle_vel) elif self.needle_pos > 100: self.needle_pos = 100; self.needle_vel = min(0, self.needle_vel) peak_t = self._rms_to_percent(self._latest_peak) if peak_t > self.peak_pos: self.peak_pos = peak_t; self.peak_hold_t = 1.5 else: if self.peak_hold_t > 0: self.peak_hold_t -= dt else: self.peak_pos -= 20.0 * dt self.peak_pos = max(0.0, min(100.0, self.peak_pos)) return self.needle_pos, self.peak_pos # ══════════════════════════════════════════════════════════════ # Flask Routes # ══════════════════════════════════════════════════════════════ @app.route('/') def index(): return render_template_string(HTML_PAGE) @app.route('/devices') def list_devices(): inputs, outputs = [], [] for i, d in enumerate(sd.query_devices()): if d['max_input_channels'] > 0: inputs.append({"id": i, "name": d['name']}) if d['max_output_channels'] > 0: outputs.append({"id": i, "name": d['name']}) return jsonify({"inputs": inputs, "outputs": outputs}) @app.route('/current_device') def current_device(): global meter in_id = meter.device_in if meter else app.config.get('audio_device_in', 0) out_id = meter.device_out if meter else app.config.get('audio_device_out', -1) return jsonify({"input": in_id, "output": out_id if out_id is not None else -1}) @app.route('/set_device//') def set_device(which, dev_id): global meter, running if which == 'in': if running: return jsonify({"ok": False, "error": "Zuerst stoppen"}) if meter: meter.stop() meter.device_in = dev_id app.config['audio_device_in'] = dev_id elif which == 'out': if meter: was_running = running if running: running = False meter.stop() meter.device_out = dev_id if dev_id >= 0 else None app.config['audio_device_out'] = dev_id if was_running: if meter.start(): running = True threading.Thread(target=update_loop, daemon=True).start() return jsonify({"ok": True}) @app.route('/toggle') def toggle(): global running, meter, current_level if running: running = False if meter: meter.stop() return jsonify({"running": False}) else: if not meter: meter = PhysicsVU( device_in=app.config.get('audio_device_in'), device_out=app.config.get('audio_device_out') ) if not meter.stream: if not meter.start(): return jsonify({"running": False, "error": "Audio failed"}) running = True threading.Thread(target=update_loop, daemon=True).start() return jsonify({"running": True}) @app.route('/level') def get_level_route(): global current_level, current_peak, meter rms = meter._latest_rms if meter else 0.0 db = round(20 * math.log10(rms), 1) if rms > 1e-7 else -120 resp = {"level": current_level, "peak": current_peak, "db": db} if meter: resp["phys"] = { "pos": round(meter.needle_pos, 2), "vel": round(meter.needle_vel, 2), "target": round(meter._target, 2), "f_spring": round(meter._f_spring, 2), "f_damping": round(meter._f_damping, 2), "f_gravity": round(meter._f_gravity, 2) } if meter.mode == 'dualband': rlo = meter._latest_rms_lo rhi = meter._latest_rms_hi resp["db_lo"] = round(20*math.log10(rlo), 1) if rlo > 1e-7 else -120 resp["db_hi"] = round(20*math.log10(rhi), 1) if rhi > 1e-7 else -120 return jsonify(resp) @app.route('/extra_status') def extra_status(): result = [] cpu = int(psutil.cpu_percent(interval=None)) result.append({"label":"CPU","found":bool(cpu_dial_uid),"value":cpu if cpu_dial_uid else 0, "uid_short":cpu_dial_uid[:8]+"…" if cpu_dial_uid else "—", "status":"aktiv" if cpu_dial_uid else "kein Dial"}) disk = int(psutil.disk_usage('/').percent) result.append({"label":"Disk /","found":bool(disk_dial_uid),"value":disk if disk_dial_uid else 0, "uid_short":disk_dial_uid[:8]+"…" if disk_dial_uid else "—", "status":"aktiv" if disk_dial_uid else "kein Dial"}) return jsonify(result) @app.route('/set//') def set_param(param, value): global meter if not meter: return jsonify({"ok": False}) if param == 'mode': meter.mode = value if value == 'iec_true': meter._iec_z[:] = 0 elif param == 'monitor': meter.monitor = value == '1' elif param == 'bypass': meter.bypass = value == '1' elif param == 'solo': meter.solo = value elif param == 'crossover': meter.band_crossover = float(value) meter._bq_lo, meter._bq_hi = meter._calc_biquad(float(value)) meter._bq_lo_z[:] = 0; meter._bq_hi_z[:] = 0 elif param == 'lo_weight': meter.band_lo_weight = float(value) elif param == 'hi_weight': meter.band_hi_weight = float(value) else: v = float(value) if param == 'mass': meter.mass = v elif param == 'damping': meter.damping = v elif param == 'spring': meter.spring = v elif param == 'gravity': meter.gravity = v elif param == 'sensitivity': meter.sensitivity = v return jsonify({"ok": True}) @app.route('/reset') def reset(): global meter if meter: meter.mass=0.8; meter.damping=1.2; meter.spring=3.0 meter.gravity=0.0; meter.sensitivity=-40 meter.needle_pos=0; meter.needle_vel=0; meter.peak_pos=0 meter.mode='full'; meter.monitor=False; meter.bypass=False; meter.solo='off' meter.band_crossover=250; meter.band_lo_weight=0.6; meter.band_hi_weight=0.4 meter._iec_z[:]=0; meter._iec_level=0 return jsonify({"ok": True}) # ── Helpers ── def value_to_backlight(pct): if pct < 60: return {"red":0,"green":100,"blue":0} elif pct < 80: return {"red":100,"green":60,"blue":0} else: return {"red":100,"green":0,"blue":0} def send_metric(uid, value): try: client.set_dial_value(uid, value) bl = value_to_backlight(value) url = f"{client.base_url}/api/v0/dial/{uid}/backlight" requests.get(url, params={**bl, "key": client.api_key}, timeout=0.5) except Exception: pass def update_loop(): global running, current_level, current_peak, meter, client global dial_uid, cpu_dial_uid, disk_dial_uid psutil.cpu_percent(interval=None) while running: if meter: current_level, current_peak = meter.get_level() if client and dial_uid: threading.Thread(target=send_metric, args=(dial_uid, current_level), daemon=True).start() if client and cpu_dial_uid: cpu = int(psutil.cpu_percent(interval=None)) threading.Thread(target=send_metric, args=(cpu_dial_uid, cpu), daemon=True).start() if client and disk_dial_uid: disk = int(psutil.disk_usage('/').percent) threading.Thread(target=send_metric, args=(disk_dial_uid, disk), daemon=True).start() time.sleep(0.015) # ── Main ── def main(): global client, dial_uid, meter parser = argparse.ArgumentParser() parser.add_argument("--api-key", required=True) parser.add_argument("--audio-device", type=int, default=None) parser.add_argument("--output-device", type=int, default=None) parser.add_argument("--dial-name", default="CPU") parser.add_argument("--port", type=int, default=8080) args = parser.parse_args() app.config['audio_device_in'] = args.audio_device app.config['audio_device_out'] = args.output_device client = VU1Client(api_key=args.api_key) dials = client.get_dials() found_uid = None for d in dials: if d['dial_name'].upper() == args.dial_name.upper(): found_uid = d['uid'] print(f"✅ Audio-Dial: {d['dial_name']} ({d['uid'][:8]}…)") break if not found_uid: print(f"❌ Dial '{args.dial_name}' nicht gefunden!") sys.exit(1) globals()['dial_uid'] = found_uid others = [d['uid'] for d in dials if d['uid'] != found_uid] if len(others) >= 1: globals()['cpu_dial_uid'] = others[0] print(f"✅ CPU-Dial: ({others[0][:8]}…)") if len(others) >= 2: globals()['disk_dial_uid'] = others[1] print(f"✅ Disk-Dial: ({others[1][:8]}…)") meter = PhysicsVU(device_in=args.audio_device, device_out=args.output_device) print(f"\n🎛️ VU1 Meter Web GUI v3") print(f"{'='*40}") print(f"http://localhost:{args.port}") print(f"{'='*40}") app.run(host='0.0.0.0', port=args.port, debug=False, threaded=True) if __name__ == "__main__": main()