#!/usr/bin/env python3 import os import time from datetime import datetime, timezone import random import threading import psutil import subprocess import hashlib import sqlite3 from flask import Flask, jsonify, request, send_from_directory # ----------------------------- # Configuration # ----------------------------- MUSIC_DIR = "/media/usb0/music" STATION_ID_DIR = "/usr/local/share/music_api/station_ids" CACHE_DIR = os.path.join(MUSIC_DIR, ".cache") ALBUM_ART_CACHE_DIR = os.path.join(CACHE_DIR, "albumart") FREQ = "103.1" STATION_NAME = "HushPup" RDS_FIFO = "/tmp/pifm_rds" STATION_ID_INTERVAL = 600 # 10 minutes STATION_ID_SONG_THRESHOLD = 3 # Audio format for decoder/wrapper SAMPLE_RATE = 44100 CHANNELS = 2 SAMPLE_WIDTH = 2 # bytes per sample (16-bit) SILENCE_MS_AFTER_TRACK = 50 TRACKS = {} # key: relative path -> value: {"id","filename","path","duration","album_art"} DB_PATH = os.path.join(CACHE_DIR, "track_cache.sqlite3") _db_lock = threading.Lock() # ----------------------------- # State # ----------------------------- queue = [] current_track = None stop_flag = False skip_flag = False shuffle_mode = False repeat_mode = "off" # off, repeat_one, repeat_all pifm_proc = None rds_pipe = None wav_wrap = None last_station_id_time = time.time() songs_since_last_id = 0 queue_lock = threading.Lock() app = Flask(__name__) # ----------------------------- # Helpers # ----------------------------- def init_db(): os.makedirs(ALBUM_ART_CACHE_DIR, exist_ok=True) with _db_lock: conn = sqlite3.connect(DB_PATH, timeout=30, isolation_level=None) try: # WAL for concurrency and faster writes conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") conn.execute(""" CREATE TABLE IF NOT EXISTS tracks ( path TEXT PRIMARY KEY, mtime INTEGER, size INTEGER, duration REAL, album_art TEXT, sha1 TEXT, updated_at TEXT ); """) conn.execute("CREATE INDEX IF NOT EXISTS idx_mtime ON tracks(mtime);") finally: conn.close() def db_get_entry(path): with _db_lock: conn = sqlite3.connect(DB_PATH, timeout=30) try: cur = conn.execute("SELECT mtime,size,duration,album_art,sha1 FROM tracks WHERE path = ?", (path,)) row = cur.fetchone() if not row: return None return {"mtime": row[0], "size": row[1], "duration": row[2], "album_art": row[3], "sha1": row[4]} finally: conn.close() def db_upsert_entry(path, mtime, size, duration, album_art, sha1=None): now = datetime.now(timezone.utc).isoformat() with _db_lock: conn = sqlite3.connect(DB_PATH, timeout=30) try: conn.execute("BEGIN") conn.execute(""" INSERT INTO tracks(path,mtime,size,duration,album_art,sha1,updated_at) VALUES(?,?,?,?,?,?,?) ON CONFLICT(path) DO UPDATE SET mtime=excluded.mtime, size=excluded.size, duration=excluded.duration, album_art=excluded.album_art, sha1=excluded.sha1, updated_at=excluded.updated_at """, (path, mtime, size, duration, album_art, sha1, now)) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise finally: conn.close() def db_delete_paths(paths): """Delete multiple paths (iterable) in a single transaction.""" with _db_lock: conn = sqlite3.connect(DB_PATH, timeout=30) try: conn.execute("BEGIN") conn.executemany("DELETE FROM tracks WHERE path = ?", ((p,) for p in paths)) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise finally: conn.close() def prune_stale_db_entries(valid_paths_set): """ Remove DB rows whose path is not in valid_paths_set. This keeps the DB clean when files are removed or renamed. """ # Collect all DB paths and compute difference with _db_lock: conn = sqlite3.connect(DB_PATH, timeout=30) try: cur = conn.execute("SELECT path FROM tracks") db_paths = {row[0] for row in cur.fetchall()} finally: conn.close() stale = db_paths - set(valid_paths_set) if not stale: return # Delete stale rows in batches batch = [] for p in stale: batch.append(p) if len(batch) >= 200: db_delete_paths(batch) batch = [] if batch: db_delete_paths(batch) app.logger.info("prune_stale_db_entries: removed %d stale entries", len(stale)) def open_fifo_nonblocking(path): fd = os.open(path, os.O_RDWR | os.O_NONBLOCK) return os.fdopen(fd, "w", buffering=1) def pick_station_id_file(): if not os.path.isdir(STATION_ID_DIR): return None files = [f for f in os.listdir(STATION_ID_DIR) if os.path.isfile(os.path.join(STATION_ID_DIR, f))] if not files: return None return os.path.join(STATION_ID_DIR, random.choice(files)) def pick_random_track(): """Return a random music track (relative path) from TRACKS.""" if not TRACKS: return None return random.choice(list(TRACKS.keys())) def set_rds(text: str): global rds_pipe try: if rds_pipe: rds_pipe.write(text[:64] + "\n") rds_pipe.flush() except Exception: pass def _safe_hash_name(rel_path: str) -> str: return hashlib.sha1(rel_path.encode("utf-8")).hexdigest() + ".jpg" # ----------------------------- # Persistent WAV wrapper + pifmadv # ----------------------------- def start_wav_wrapper(): """ Start a persistent ffmpeg process that reads raw PCM from stdin and writes a WAV stream to stdout. The wrapper stdin remains open across tracks. """ global wav_wrap if wav_wrap and wav_wrap.poll() is None: return wrapper_args = [ "ffmpeg", "-f", "s16le", # raw PCM input "-ac", str(CHANNELS), "-ar", str(SAMPLE_RATE), "-i", "-", # stdin "-f", "wav", # output WAV stream "-ac", str(CHANNELS), "-ar", str(SAMPLE_RATE), "-" # stdout → pifmadv ] wav_wrap = subprocess.Popen( wrapper_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=0 ) def start_pifm(): """ Start the persistent wrapper and pifmadv. pifmadv reads from wrapper stdout. """ global pifm_proc, rds_pipe, wav_wrap if not os.path.exists(RDS_FIFO): try: os.mkfifo(RDS_FIFO) except FileExistsError: pass start_wav_wrapper() # Replace these args with your actual pifmadv invocation pifm_args = [ "pifmadv", "--freq", FREQ, "--pi", "beef", "--pty", "9", "--ps", STATION_NAME, "--ctl", RDS_FIFO, "--power", "7", "--preemph", "us", "--cutoff", "20500", "--audio", "-", ] # Start pifmadv reading from wrapper stdout pifm_proc = subprocess.Popen( pifm_args, stdin=wav_wrap.stdout, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) # Close wrapper stdout in parent so pifm owns the read end try: wav_wrap.stdout.close() except Exception: pass time.sleep(0.1) rds_pipe = open_fifo_nonblocking(RDS_FIFO) def write_silence(seconds: float): """Feed raw PCM silence into the WAV wrapper (non-blocking).""" global wav_wrap if not wav_wrap or wav_wrap.poll() is not None: return frames = int(SAMPLE_RATE * seconds) silence = (b"\x00" * SAMPLE_WIDTH * CHANNELS) * frames try: # Acquire a short lock to avoid concurrent writes from multiple threads with queue_lock: wav_wrap.stdin.write(silence) wav_wrap.stdin.flush() except BrokenPipeError: pass except Exception: pass def _safe_art_name(rel_path: str) -> str: """Stable filesystem-safe name for album art based on relative path.""" h = hashlib.sha1(rel_path.encode("utf-8")).hexdigest() return f"{h}.jpg" def probe_duration(path: str, timeout: int = 6): """Run ffprobe once to get duration in seconds or None on failure.""" try: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, timeout=timeout ) out = result.stdout.strip() if not out: return None return float(out) except Exception: return None def extract_album_art(path: str, rel_path: str, timeout: int = 10): """ Extract embedded album art to ALBUM_ART_CACHE_DIR and return cache filename, or None if no art or extraction failed. """ os.makedirs(ALBUM_ART_CACHE_DIR, exist_ok=True) art_name = _safe_art_name(rel_path) art_path = os.path.join(ALBUM_ART_CACHE_DIR, art_name) # If already cached, return immediately if os.path.exists(art_path) and os.path.getsize(art_path) > 0: return art_name # Try to extract the first video stream / attached pic # Use ffmpeg to write a jpeg; if none exists, ffmpeg will exit nonzero try: # -y overwrite, -v error quiets output subprocess.run( [ "ffmpeg", "-y", "-v", "error", "-i", path, "-an", "-vcodec", "copy", art_path ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=timeout, check=False ) # If file exists and non-empty, return it if os.path.exists(art_path) and os.path.getsize(art_path) > 0: return art_name except Exception: pass # Cleanup any zero-length file try: if os.path.exists(art_path) and os.path.getsize(art_path) == 0: os.remove(art_path) except Exception: pass return None def display_title(path): """Convert 'dir/a/b/Song - Title.ext' → 'Song - Title'.""" if not path: return None base = os.path.basename(path) title, _ = os.path.splitext(base) return title # ----------------------------- # Pump helper for decoder -> wrapper # ----------------------------- def _pump_decoder_to_wrapper(decoder_stdout, stop_event): """ Copy bytes from decoder_stdout to wrapper_proc.stdin until EOF or stop_event. Do NOT close wrapper_proc.stdin when done. """ global wav_wrap try: bufsize = 64 * 1024 while not stop_event.is_set(): chunk = decoder_stdout.read(bufsize) if not chunk: break try: with queue_lock: if wav_wrap and wav_wrap.poll() is None: wav_wrap.stdin.write(chunk) wav_wrap.stdin.flush() else: # wrapper died; stop pumping break except BrokenPipeError: break except Exception: break except Exception: pass finally: try: decoder_stdout.close() except Exception: pass def write_silence_ms(ms: int = SILENCE_MS_AFTER_TRACK): write_silence(ms / 1000.0) def is_zombie(proc): try: return proc.status() == psutil.STATUS_ZOMBIE except psutil.NoSuchProcess: return True def restart_radio_chain(): print("[WATCHDOG] Restarting radio chain") # Kill ffmpeg for proc in psutil.process_iter(['name', 'cmdline']): try: cmd = proc.info['cmdline'] or [] if proc.info['name'] == 'ffmpeg' or (cmd and 'ffmpeg' in cmd[0]): print(f"[WATCHDOG] Killing ffmpeg PID {proc.pid}") proc.kill() except Exception: pass # Kill pifmadv for proc in psutil.process_iter(['name', 'cmdline']): try: cmd = proc.info['cmdline'] or [] if proc.info['name'] == 'pifmadv' or (cmd and 'pifmadv' in cmd[0]): print(f"[WATCHDOG] Killing pifmadv PID {proc.pid}") proc.kill() except Exception: pass time.sleep(0.5) print("[WATCHDOG] Restarting pifmadv") start_pifm() # Your addition: broadcast station ID id_file = pick_station_id_file() if id_file: print(f"[WATCHDOG] Broadcasting recovery ID: {id_file}") stream_file(id_file, allow_skip=False) write_silence(0.25) # ----------------------------- # Robust stream_file implementation # ----------------------------- def stream_file(path: str, allow_skip=True) -> bool: """ Decode a file to RAW PCM (s16le, stereo, SAMPLE_RATE) and pump into the persistent WAV wrapper stdin. Returns True if decoder exited with code 0 (normal EOF). Returns False if aborted (skip/stop) or decoder error. """ global wav_wrap, stop_flag, skip_flag if not wav_wrap or wav_wrap.poll() is not None: print("wav_wrap is not running") return False # Build decoder args to emit raw PCM matching wrapper input decoder_args = [ "ffmpeg", "-nostdin", "-v", "error", "-i", path, "-f", "s16le", # raw PCM "-af", "acompressor=threshold=-18dB:ratio=4:attack=5:release=100,alimiter=limit=0.0dB", # compress just like a real radio station, because this is a real radio station "-ar", str(SAMPLE_RATE), "-ac", str(CHANNELS), "pipe:1" ] try: decoder = subprocess.Popen( decoder_args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, bufsize=0 ) except Exception as e: print("[stream_file] failed to start decoder:", e) return False stop_event = threading.Event() pump_thread = threading.Thread(target=_pump_decoder_to_wrapper, args=(decoder.stdout, stop_event), daemon=True) pump_thread.start() try: while True: ret = decoder.poll() if ret is not None: # Decoder exited; let pump flush briefly pump_thread.join(timeout=1) write_silence_ms(50) return ret == 0 # Abort on global stop if stop_flag: try: decoder.terminate() except Exception: pass stop_event.set() pump_thread.join(timeout=1) write_silence_ms() return False # Abort on skip if allow_skip and skip_flag: try: decoder.terminate() except Exception: pass stop_event.set() pump_thread.join(timeout=1) # Do not clear skip_flag here; worker will clear under lock write_silence_ms() return False time.sleep(0.05) except Exception as e: print("[stream_file] Exception:", e) try: decoder.kill() except Exception: pass stop_event.set() pump_thread.join(timeout=1) write_silence_ms() return False # ----------------------------- # Discover tracks (at startup) # ----------------------------- def discover_tracks(use_hash=False, save_every_n=200): """ Walk MUSIC_DIR, reuse DB rows when mtime+size match. Probe/extract only for new/changed files. Save progress periodically. """ init_db() TRACKS.clear() seen = set() count = 0 for root, dirs, files in os.walk(MUSIC_DIR): for name in files: if not name.lower().endswith((".mp3", ".flac", ".wav", ".m4a", ".ogg", ".aac")): continue full = os.path.join(root, name) rel = os.path.relpath(full, MUSIC_DIR) seen.add(rel) # stat try: st = os.stat(full) mtime = int(st.st_mtime) size = st.st_size except Exception: mtime, size = None, None cached = db_get_entry(rel) reuse = False duration = None art = None sha = None if cached and cached.get("mtime") == mtime and cached.get("size") == size: duration = cached.get("duration") art = cached.get("album_art") sha = cached.get("sha1") reuse = True elif cached and use_hash: # compute sha1 only when mtime/size differ and use_hash requested sha_now = compute_sha1(full) if sha_now and sha_now == cached.get("sha1"): duration = cached.get("duration") art = cached.get("album_art") # update mtime/size to current values db_upsert_entry(rel, mtime, size, duration, art, sha_now) reuse = True if not reuse: duration = probe_duration(full) art = extract_album_art(full, rel) sha = compute_sha1(full) if use_hash else None db_upsert_entry(rel, mtime, size, duration, art, sha) TRACKS[rel] = { "id": rel, "filename": name, "path": rel, "duration": duration, "album_art": art } count += 1 if save_every_n and count % save_every_n == 0: # small progress log app.logger.info("discover_tracks: processed %d files", count) # prune stale DB rows (entries not seen in this scan) prune_stale_db_entries(seen) app.logger.info("discover_tracks: finished, total=%d", len(TRACKS)) # ----------------------------- # Playback Worker # ----------------------------- def worker(): global current_track, stop_flag, skip_flag, last_station_id_time, songs_since_last_id global queue, shuffle_mode, repeat_mode while True: with queue_lock: if not queue: current_track = None empty = True else: empty = False if empty: track = pick_random_track() if track: print(f"[WORKER] Queue empty — adding random track: {track}") with queue_lock: queue.append(track) stop_flag = False try: if stop_flag: with queue_lock: current_track = None queue.clear() write_silence(0.25) time.sleep(0.5) continue # Station ID condition: whichever comes first (time or songs) now = time.time() with queue_lock: time_elapsed = now - last_station_id_time songs_count = songs_since_last_id if time_elapsed >= STATION_ID_INTERVAL or songs_count >= STATION_ID_SONG_THRESHOLD: id_file = pick_station_id_file() if id_file: with queue_lock: current_track = None songs_since_last_id = 0 set_rds("RT HushPupPi Station ID") stream_file(id_file, allow_skip=False) write_silence(0.25) # Reset the last_station_id_time after playing last_station_id_time = time.time() else: # No station id files; reset timer so we don't spin last_station_id_time = time.time() # Shuffle only when building the play order if shuffle_mode: with queue_lock: random.shuffle(queue) # Repeat_one: loop the first track if repeat_mode == "repeat_one": with queue_lock: current_track = queue[0] if queue else None path_current = os.path.join(MUSIC_DIR, current_track) if current_track else None set_rds(f"RT {current_track}") played_ok = stream_file(path_current) if path_current else False write_silence(0.25) with queue_lock: current_track = None time.sleep(0.1) continue # Normal / repeat_all with queue_lock: current_track = queue[0] if queue else None path_current = os.path.join(MUSIC_DIR, current_track) if current_track else None set_rds(f"RT {current_track}") print(f"[WORKER] Starting: {current_track} | queue={queue} | skip={skip_flag} stop={stop_flag}") played_ok = stream_file(path_current) if path_current else False # Snapshot flags under lock to avoid races with queue_lock: local_skip = skip_flag local_stop = stop_flag head = queue[0] if queue else None print(f"[WORKER] Finished stream_file: {current_track} played_ok={played_ok} skip={local_skip} stop={local_stop}") if local_stop: # stop_flag handled at top of loop continue if played_ok: # normal completion: remove head only if it matches with queue_lock: if queue and queue[0] == head: popped = queue.pop(0) print(f"[WORKER] popped after play: {popped} | queue now {queue}") if repeat_mode == "repeat_all": queue.append(popped) else: # not played_ok: if it was a skip, clear skip and pop once if local_skip: with queue_lock: skip_flag = False if queue and queue[0] == head: popped = queue.pop(0) print(f"[WORKER] Skip: popped {popped} | queue now {queue}") write_silence(0.25) continue if not current_track: # there was no song to play write_silence(1) time.sleep(1) stop_flag = True continue # Real failure: log and drop the track to avoid infinite loop print("[WORKER] Track failed to play:", current_track) with queue_lock: if queue and queue[0] == head: dropped = queue.pop(0) print(f"[WORKER] Dropped failed track: {dropped}") write_silence(0.25) time.sleep(0.5) continue with queue_lock: current_track = None time.sleep(0.1) except Exception as e: print("[WORKER] Exception in worker loop:", e) time.sleep(1) restart_lock = threading.Lock() def watchdog(): while True: time.sleep(5) pifm_state = "missing" ffmpeg_state = "missing" for proc in psutil.process_iter(['name', 'cmdline', 'status']): try: cmd = proc.info['cmdline'] or [] name = proc.info['name'] if name == "pifmadv" or (cmd and "pifmadv" in cmd[0]): if proc.status() == psutil.STATUS_ZOMBIE: pifm_state = "zombie" else: pifm_state = "ok" if name == "ffmpeg" or (cmd and "ffmpeg" in cmd[0]): # Only count ffmpeg processes that belong to your pipeline if any(x in cmd for x in ["-f", "wav", "-i", "-"]): if proc.status() == psutil.STATUS_ZOMBIE: ffmpeg_state = "zombie" else: ffmpeg_state = "ok" except Exception: continue if pifm_state != "ok" or ffmpeg_state != "ok": with restart_lock: print(f"[WATCHDOG] pifmadv={pifm_state}, ffmpeg={ffmpeg_state}") restart_radio_chain() # ----------------------------- # API Endpoints # ----------------------------- @app.get("/tracks") def list_tracks(): # Return precomputed metadata as a sorted list items = list(TRACKS.values()) items.sort(key=lambda x: x["path"]) return jsonify(items) @app.get("/album_art/") def album_art(name): # Serve cached album art files safe = os.path.join(ALBUM_ART_CACHE_DIR, name) if not os.path.exists(safe): return ("", 404) return send_from_directory(ALBUM_ART_CACHE_DIR, name) @app.get("/queue") def get_queue(): with queue_lock: return jsonify([display_title(item) for item in queue]) @app.post("/queue") def add_to_queue(): global queue, stop_flag data = request.get_json(force=True) track = data.get("track") if not track: return jsonify({"error": "no track"}), 400 # Validate against discovered tracks if track not in TRACKS: return jsonify({"error": "unknown track"}), 404 with queue_lock: queue.append(track) stop_flag = False return jsonify({"queued": track}) @app.get("/now") def now_playing(): with queue_lock: art = None if current_track and current_track in TRACKS: art_name = TRACKS[current_track].get("album_art") if art_name: art = f"/album_art/{art_name}" return jsonify({"track": display_title(current_track), "album_art": art}) @app.post("/shuffle") def set_shuffle(): global shuffle_mode data = request.get_json(force=True) shuffle_mode = bool(data.get("enabled", False)) return jsonify({"shuffle": shuffle_mode}) @app.post("/repeat") def set_repeat(): global repeat_mode data = request.get_json(force=True) mode = data.get("mode", "off") if mode in ["off", "repeat_one", "repeat_all"]: repeat_mode = mode return jsonify({"repeat": repeat_mode}) @app.post("/skip") def skip(): global skip_flag with queue_lock: skip_flag = True return jsonify({"status": "skipped"}) @app.post("/stop") def stop(): global stop_flag, queue, current_track with queue_lock: stop_flag = True queue.clear() current_track = None return jsonify({"status": "stopping"}) # ----------------------------- # Startup # ----------------------------- if __name__ == "__main__": os.makedirs(MUSIC_DIR, exist_ok=True) os.makedirs(STATION_ID_DIR, exist_ok=True) discover_tracks() start_pifm() # play station ID at startup so the listener(s) know we're up id_file = pick_station_id_file() if id_file: set_rds("RT HushPupPi Station ID") stream_file(id_file, allow_skip=False) write_silence(0.25) threading.Thread(target=worker, daemon=True).start() threading.Thread(target=watchdog, daemon=True).start() # Reduce Flask logging noise in production import logging logging.getLogger("werkzeug").setLevel(logging.ERROR) app.run(host="0.0.0.0", port=5001, debug=False)