You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

873 lines
27 KiB

#!/usr/bin/env python3
import os
import time
from datetime import datetime, timezone
import random
import threading
import psutil
import subprocess
import hashlib
import sqlite3
from flask import Flask, jsonify, request, send_from_directory
# -----------------------------
# Configuration
# -----------------------------
MUSIC_DIR = "/media/usb0/music"
STATION_ID_DIR = "/usr/local/share/music_api/station_ids"
CACHE_DIR = os.path.join(MUSIC_DIR, ".cache")
ALBUM_ART_CACHE_DIR = os.path.join(CACHE_DIR, "albumart")
FREQ = "103.1"
STATION_NAME = "HushPup"
RDS_FIFO = "/tmp/pifm_rds"
STATION_ID_INTERVAL = 600 # 10 minutes
STATION_ID_SONG_THRESHOLD = 3
# Audio format for decoder/wrapper
SAMPLE_RATE = 44100
CHANNELS = 2
SAMPLE_WIDTH = 2 # bytes per sample (16-bit)
SILENCE_MS_AFTER_TRACK = 50
TRACKS = {} # key: relative path -> value: {"id","filename","path","duration","album_art"}
DB_PATH = os.path.join(CACHE_DIR, "track_cache.sqlite3")
_db_lock = threading.Lock()
# -----------------------------
# State
# -----------------------------
queue = []
current_track = None
stop_flag = False
skip_flag = False
shuffle_mode = False
repeat_mode = "off" # off, repeat_one, repeat_all
pifm_proc = None
rds_pipe = None
wav_wrap = None
last_station_id_time = time.time()
songs_since_last_id = 0
queue_lock = threading.Lock()
app = Flask(__name__)
# -----------------------------
# Helpers
# -----------------------------
def init_db():
os.makedirs(ALBUM_ART_CACHE_DIR, exist_ok=True)
with _db_lock:
conn = sqlite3.connect(DB_PATH, timeout=30, isolation_level=None)
try:
# WAL for concurrency and faster writes
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA synchronous=NORMAL;")
conn.execute("""
CREATE TABLE IF NOT EXISTS tracks (
path TEXT PRIMARY KEY,
mtime INTEGER,
size INTEGER,
duration REAL,
album_art TEXT,
sha1 TEXT,
updated_at TEXT
);
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_mtime ON tracks(mtime);")
finally:
conn.close()
def db_get_entry(path):
with _db_lock:
conn = sqlite3.connect(DB_PATH, timeout=30)
try:
cur = conn.execute("SELECT mtime,size,duration,album_art,sha1 FROM tracks WHERE path = ?",
(path,))
row = cur.fetchone()
if not row:
return None
return {"mtime": row[0], "size": row[1], "duration": row[2],
"album_art": row[3], "sha1": row[4]}
finally:
conn.close()
def db_upsert_entry(path, mtime, size, duration, album_art, sha1=None):
now = datetime.now(timezone.utc).isoformat()
with _db_lock:
conn = sqlite3.connect(DB_PATH, timeout=30)
try:
conn.execute("BEGIN")
conn.execute("""
INSERT INTO tracks(path,mtime,size,duration,album_art,sha1,updated_at)
VALUES(?,?,?,?,?,?,?)
ON CONFLICT(path) DO UPDATE SET
mtime=excluded.mtime,
size=excluded.size,
duration=excluded.duration,
album_art=excluded.album_art,
sha1=excluded.sha1,
updated_at=excluded.updated_at
""", (path, mtime, size, duration, album_art, sha1, now))
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
finally:
conn.close()
def db_delete_paths(paths):
"""Delete multiple paths (iterable) in a single transaction."""
with _db_lock:
conn = sqlite3.connect(DB_PATH, timeout=30)
try:
conn.execute("BEGIN")
conn.executemany("DELETE FROM tracks WHERE path = ?", ((p,) for p in paths))
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
finally:
conn.close()
def prune_stale_db_entries(valid_paths_set):
"""
Remove DB rows whose path is not in valid_paths_set.
This keeps the DB clean when files are removed or renamed.
"""
# Collect all DB paths and compute difference
with _db_lock:
conn = sqlite3.connect(DB_PATH, timeout=30)
try:
cur = conn.execute("SELECT path FROM tracks")
db_paths = {row[0] for row in cur.fetchall()}
finally:
conn.close()
stale = db_paths - set(valid_paths_set)
if not stale:
return
# Delete stale rows in batches
batch = []
for p in stale:
batch.append(p)
if len(batch) >= 200:
db_delete_paths(batch)
batch = []
if batch:
db_delete_paths(batch)
app.logger.info("prune_stale_db_entries: removed %d stale entries", len(stale))
def open_fifo_nonblocking(path):
fd = os.open(path, os.O_RDWR | os.O_NONBLOCK)
return os.fdopen(fd, "w", buffering=1)
def pick_station_id_file():
if not os.path.isdir(STATION_ID_DIR):
return None
files = [f for f in os.listdir(STATION_ID_DIR) if os.path.isfile(os.path.join(STATION_ID_DIR, f))]
if not files:
return None
return os.path.join(STATION_ID_DIR, random.choice(files))
def pick_random_track():
"""Return a random music track (relative path) from TRACKS."""
if not TRACKS:
return None
return random.choice(list(TRACKS.keys()))
def set_rds(text: str):
global rds_pipe
try:
if rds_pipe:
rds_pipe.write(text[:64] + "\n")
rds_pipe.flush()
except Exception:
pass
def _safe_hash_name(rel_path: str) -> str:
return hashlib.sha1(rel_path.encode("utf-8")).hexdigest() + ".jpg"
# -----------------------------
# Persistent WAV wrapper + pifmadv
# -----------------------------
def start_wav_wrapper():
"""
Start a persistent ffmpeg process that reads raw PCM from stdin and
writes a WAV stream to stdout. The wrapper stdin remains open across tracks.
"""
global wav_wrap
if wav_wrap and wav_wrap.poll() is None:
return
wrapper_args = [
"ffmpeg",
"-f", "s16le", # raw PCM input
"-ac", str(CHANNELS),
"-ar", str(SAMPLE_RATE),
"-i", "-", # stdin
"-f", "wav", # output WAV stream
"-ac", str(CHANNELS),
"-ar", str(SAMPLE_RATE),
"-" # stdout → pifmadv
]
wav_wrap = subprocess.Popen(
wrapper_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
bufsize=0
)
def start_pifm():
"""
Start the persistent wrapper and pifmadv. pifmadv reads from wrapper stdout.
"""
global pifm_proc, rds_pipe, wav_wrap
if not os.path.exists(RDS_FIFO):
try:
os.mkfifo(RDS_FIFO)
except FileExistsError:
pass
start_wav_wrapper()
# Replace these args with your actual pifmadv invocation
pifm_args = [
"pifmadv",
"--freq", FREQ,
"--pi", "beef",
"--pty", "9",
"--ps", STATION_NAME,
"--ctl", RDS_FIFO,
"--power", "7",
"--preemph", "us",
"--cutoff", "20500",
"--audio", "-",
]
# Start pifmadv reading from wrapper stdout
pifm_proc = subprocess.Popen(
pifm_args,
stdin=wav_wrap.stdout,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# Close wrapper stdout in parent so pifm owns the read end
try:
wav_wrap.stdout.close()
except Exception:
pass
time.sleep(0.1)
rds_pipe = open_fifo_nonblocking(RDS_FIFO)
def write_silence(seconds: float):
"""Feed raw PCM silence into the WAV wrapper (non-blocking)."""
global wav_wrap
if not wav_wrap or wav_wrap.poll() is not None:
return
frames = int(SAMPLE_RATE * seconds)
silence = (b"\x00" * SAMPLE_WIDTH * CHANNELS) * frames
try:
# Acquire a short lock to avoid concurrent writes from multiple threads
with queue_lock:
wav_wrap.stdin.write(silence)
wav_wrap.stdin.flush()
except BrokenPipeError:
pass
except Exception:
pass
def _safe_art_name(rel_path: str) -> str:
"""Stable filesystem-safe name for album art based on relative path."""
h = hashlib.sha1(rel_path.encode("utf-8")).hexdigest()
return f"{h}.jpg"
def probe_duration(path: str, timeout: int = 6):
"""Run ffprobe once to get duration in seconds or None on failure."""
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
path
],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
timeout=timeout
)
out = result.stdout.strip()
if not out:
return None
return float(out)
except Exception:
return None
def extract_album_art(path: str, rel_path: str, timeout: int = 10):
"""
Extract embedded album art to ALBUM_ART_CACHE_DIR and return cache filename,
or None if no art or extraction failed.
"""
os.makedirs(ALBUM_ART_CACHE_DIR, exist_ok=True)
art_name = _safe_art_name(rel_path)
art_path = os.path.join(ALBUM_ART_CACHE_DIR, art_name)
# If already cached, return immediately
if os.path.exists(art_path) and os.path.getsize(art_path) > 0:
return art_name
# Try to extract the first video stream / attached pic
# Use ffmpeg to write a jpeg; if none exists, ffmpeg will exit nonzero
try:
# -y overwrite, -v error quiets output
subprocess.run(
[
"ffmpeg", "-y", "-v", "error",
"-i", path,
"-an", "-vcodec", "copy",
art_path
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=timeout,
check=False
)
# If file exists and non-empty, return it
if os.path.exists(art_path) and os.path.getsize(art_path) > 0:
return art_name
except Exception:
pass
# Cleanup any zero-length file
try:
if os.path.exists(art_path) and os.path.getsize(art_path) == 0:
os.remove(art_path)
except Exception:
pass
return None
# -----------------------------
# Pump helper for decoder -> wrapper
# -----------------------------
def _pump_decoder_to_wrapper(decoder_stdout, stop_event):
"""
Copy bytes from decoder_stdout to wrapper_proc.stdin until EOF or stop_event.
Do NOT close wrapper_proc.stdin when done.
"""
global wav_wrap
try:
bufsize = 64 * 1024
while not stop_event.is_set():
chunk = decoder_stdout.read(bufsize)
if not chunk:
break
try:
with queue_lock:
if wav_wrap and wav_wrap.poll() is None:
wav_wrap.stdin.write(chunk)
wav_wrap.stdin.flush()
else:
# wrapper died; stop pumping
break
except BrokenPipeError:
break
except Exception:
break
except Exception:
pass
finally:
try:
decoder_stdout.close()
except Exception:
pass
def write_silence_ms(ms: int = SILENCE_MS_AFTER_TRACK):
write_silence(ms / 1000.0)
def is_zombie(proc):
try:
return proc.status() == psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
return True
def restart_radio_chain():
print("[WATCHDOG] Restarting radio chain")
# Kill ffmpeg
for proc in psutil.process_iter(['name', 'cmdline']):
try:
cmd = proc.info['cmdline'] or []
if proc.info['name'] == 'ffmpeg' or (cmd and 'ffmpeg' in cmd[0]):
print(f"[WATCHDOG] Killing ffmpeg PID {proc.pid}")
proc.kill()
except Exception:
pass
# Kill pifmadv
for proc in psutil.process_iter(['name', 'cmdline']):
try:
cmd = proc.info['cmdline'] or []
if proc.info['name'] == 'pifmadv' or (cmd and 'pifmadv' in cmd[0]):
print(f"[WATCHDOG] Killing pifmadv PID {proc.pid}")
proc.kill()
except Exception:
pass
time.sleep(0.5)
print("[WATCHDOG] Restarting pifmadv")
start_pifm()
# Your addition: broadcast station ID
id_file = pick_station_id_file()
if id_file:
print(f"[WATCHDOG] Broadcasting recovery ID: {id_file}")
stream_file(id_file, allow_skip=False)
write_silence(0.25)
# -----------------------------
# Robust stream_file implementation
# -----------------------------
def stream_file(path: str, allow_skip=True) -> bool:
"""
Decode a file to RAW PCM (s16le, stereo, SAMPLE_RATE)
and pump into the persistent WAV wrapper stdin.
Returns True if decoder exited with code 0 (normal EOF).
Returns False if aborted (skip/stop) or decoder error.
"""
global wav_wrap, stop_flag, skip_flag
if not wav_wrap or wav_wrap.poll() is not None:
print("wav_wrap is not running")
return False
# Build decoder args to emit raw PCM matching wrapper input
decoder_args = [
"ffmpeg", "-nostdin", "-v", "error",
"-i", path,
"-f", "s16le", # raw PCM
"-af", "acompressor=threshold=-18dB:ratio=4:attack=5:release=100,alimiter=limit=0.0dB", # compress just like a real radio station, because this is a real radio station
"-ar", str(SAMPLE_RATE),
"-ac", str(CHANNELS),
"pipe:1"
]
try:
decoder = subprocess.Popen(
decoder_args,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
bufsize=0
)
except Exception as e:
print("[stream_file] failed to start decoder:", e)
return False
stop_event = threading.Event()
pump_thread = threading.Thread(target=_pump_decoder_to_wrapper, args=(decoder.stdout, stop_event), daemon=True)
pump_thread.start()
try:
while True:
ret = decoder.poll()
if ret is not None:
# Decoder exited; let pump flush briefly
pump_thread.join(timeout=1)
write_silence_ms(50)
return ret == 0
# Abort on global stop
if stop_flag:
try:
decoder.terminate()
except Exception:
pass
stop_event.set()
pump_thread.join(timeout=1)
write_silence_ms()
return False
# Abort on skip
if allow_skip and skip_flag:
try:
decoder.terminate()
except Exception:
pass
stop_event.set()
pump_thread.join(timeout=1)
# Do not clear skip_flag here; worker will clear under lock
write_silence_ms()
return False
time.sleep(0.05)
except Exception as e:
print("[stream_file] Exception:", e)
try:
decoder.kill()
except Exception:
pass
stop_event.set()
pump_thread.join(timeout=1)
write_silence_ms()
return False
# -----------------------------
# Discover tracks (at startup)
# -----------------------------
def discover_tracks(use_hash=False, save_every_n=200):
"""
Walk MUSIC_DIR, reuse DB rows when mtime+size match.
Probe/extract only for new/changed files. Save progress periodically.
"""
init_db()
TRACKS.clear()
seen = set()
count = 0
for root, dirs, files in os.walk(MUSIC_DIR):
for name in files:
if not name.lower().endswith((".mp3", ".flac", ".wav", ".m4a", ".ogg", ".aac")):
continue
full = os.path.join(root, name)
rel = os.path.relpath(full, MUSIC_DIR)
seen.add(rel)
# stat
try:
st = os.stat(full)
mtime = int(st.st_mtime)
size = st.st_size
except Exception:
mtime, size = None, None
cached = db_get_entry(rel)
reuse = False
duration = None
art = None
sha = None
if cached and cached.get("mtime") == mtime and cached.get("size") == size:
duration = cached.get("duration")
art = cached.get("album_art")
sha = cached.get("sha1")
reuse = True
elif cached and use_hash:
# compute sha1 only when mtime/size differ and use_hash requested
sha_now = compute_sha1(full)
if sha_now and sha_now == cached.get("sha1"):
duration = cached.get("duration")
art = cached.get("album_art")
# update mtime/size to current values
db_upsert_entry(rel, mtime, size, duration, art, sha_now)
reuse = True
if not reuse:
duration = probe_duration(full)
art = extract_album_art(full, rel)
sha = compute_sha1(full) if use_hash else None
db_upsert_entry(rel, mtime, size, duration, art, sha)
TRACKS[rel] = {
"id": rel,
"filename": name,
"path": rel,
"duration": duration,
"album_art": art
}
count += 1
if save_every_n and count % save_every_n == 0:
# small progress log
app.logger.info("discover_tracks: processed %d files", count)
# prune stale DB rows (entries not seen in this scan)
prune_stale_db_entries(seen)
app.logger.info("discover_tracks: finished, total=%d", len(TRACKS))
# -----------------------------
# Playback Worker
# -----------------------------
def worker():
global current_track, stop_flag, skip_flag, last_station_id_time, songs_since_last_id
global queue, shuffle_mode, repeat_mode
while True:
with queue_lock:
if not queue:
current_track = None
empty = True
else:
empty = False
if empty:
track = pick_random_track()
if track:
print(f"[WORKER] Queue empty — adding random track: {track}")
with queue_lock:
queue.append(track)
stop_flag = False
try:
if stop_flag:
with queue_lock:
current_track = None
queue.clear()
write_silence(0.25)
time.sleep(0.5)
continue
# Station ID condition: whichever comes first (time or songs)
now = time.time()
with queue_lock:
time_elapsed = now - last_station_id_time
songs_count = songs_since_last_id
if time_elapsed >= STATION_ID_INTERVAL or songs_count >= STATION_ID_SONG_THRESHOLD:
id_file = pick_station_id_file()
if id_file:
with queue_lock:
current_track = None
songs_since_last_id = 0
set_rds("RT HushPupPi Station ID")
stream_file(id_file, allow_skip=False)
write_silence(0.25)
# Reset the last_station_id_time after playing
last_station_id_time = time.time()
else:
# No station id files; reset timer so we don't spin
last_station_id_time = time.time()
# Shuffle only when building the play order
if shuffle_mode:
with queue_lock:
random.shuffle(queue)
# Repeat_one: loop the first track
if repeat_mode == "repeat_one":
with queue_lock:
current_track = queue[0]
path_current = os.path.join(MUSIC_DIR, current_track)
set_rds(f"RT {current_track}")
played_ok = stream_file(path_current)
write_silence(0.25)
with queue_lock:
current_track = None
time.sleep(0.1)
continue
# Normal / repeat_all
with queue_lock:
current_track = queue[0]
path_current = os.path.join(MUSIC_DIR, current_track)
set_rds(f"RT {current_track}")
print(f"[WORKER] Starting: {current_track} | queue={queue} | skip={skip_flag} stop={stop_flag}")
played_ok = stream_file(path_current)
# Snapshot flags under lock to avoid races
with queue_lock:
local_skip = skip_flag
local_stop = stop_flag
head = queue[0] if queue else None
print(f"[WORKER] Finished stream_file: {current_track} played_ok={played_ok} skip={local_skip} stop={local_stop}")
if local_stop:
# stop_flag handled at top of loop
continue
if played_ok:
# normal completion: remove head only if it matches
with queue_lock:
if queue and queue[0] == head:
popped = queue.pop(0)
print(f"[WORKER] popped after play: {popped} | queue now {queue}")
if repeat_mode == "repeat_all":
queue.append(popped)
else:
# not played_ok: if it was a skip, clear skip and pop once
if local_skip:
with queue_lock:
skip_flag = False
if queue and queue[0] == head:
popped = queue.pop(0)
print(f"[WORKER] Skip: popped {popped} | queue now {queue}")
write_silence(0.25)
continue
# Real failure: log and drop the track to avoid infinite loop
print("[WORKER] Track failed to play:", current_track)
with queue_lock:
if queue and queue[0] == head:
dropped = queue.pop(0)
print(f"[WORKER] Dropped failed track: {dropped}")
write_silence(0.25)
time.sleep(0.5)
continue
with queue_lock:
current_track = None
time.sleep(0.1)
except Exception as e:
print("[WORKER] Exception in worker loop:", e)
time.sleep(1)
restart_lock = threading.Lock()
def watchdog():
while True:
time.sleep(5)
pifm_state = "missing"
ffmpeg_state = "missing"
for proc in psutil.process_iter(['name', 'cmdline', 'status']):
try:
cmd = proc.info['cmdline'] or []
name = proc.info['name']
if name == "pifmadv" or (cmd and "pifmadv" in cmd[0]):
if proc.status() == psutil.STATUS_ZOMBIE:
pifm_state = "zombie"
else:
pifm_state = "ok"
if name == "ffmpeg" or (cmd and "ffmpeg" in cmd[0]):
# Only count ffmpeg processes that belong to your pipeline
if any(x in cmd for x in ["-f", "wav", "-i", "-"]):
if proc.status() == psutil.STATUS_ZOMBIE:
ffmpeg_state = "zombie"
else:
ffmpeg_state = "ok"
except Exception:
continue
if pifm_state != "ok" or ffmpeg_state != "ok":
with restart_lock:
print(f"[WATCHDOG] pifmadv={pifm_state}, ffmpeg={ffmpeg_state}")
restart_radio_chain()
# -----------------------------
# API Endpoints
# -----------------------------
@app.get("/tracks")
def list_tracks():
# Return precomputed metadata as a sorted list
items = list(TRACKS.values())
items.sort(key=lambda x: x["path"])
return jsonify(items)
@app.get("/album_art/<name>")
def album_art(name):
# Serve cached album art files
safe = os.path.join(ALBUM_ART_CACHE_DIR, name)
if not os.path.exists(safe):
return ("", 404)
return send_from_directory(ALBUM_ART_CACHE_DIR, name)
@app.get("/queue")
def get_queue():
with queue_lock:
return jsonify(list(queue))
@app.post("/queue")
def add_to_queue():
global queue, stop_flag
data = request.get_json(force=True)
track = data.get("track")
if not track:
return jsonify({"error": "no track"}), 400
# Validate against discovered tracks
if track not in TRACKS:
return jsonify({"error": "unknown track"}), 404
with queue_lock:
queue.append(track)
stop_flag = False
return jsonify({"queued": track})
@app.get("/now")
def now_playing():
with queue_lock:
art = None
if current_track and current_track in TRACKS:
art_name = TRACKS[current_track].get("album_art")
if art_name:
art = f"/album_art/{art_name}"
return jsonify({"track": current_track, "album_art": art})
@app.post("/shuffle")
def set_shuffle():
global shuffle_mode
data = request.get_json(force=True)
shuffle_mode = bool(data.get("enabled", False))
return jsonify({"shuffle": shuffle_mode})
@app.post("/repeat")
def set_repeat():
global repeat_mode
data = request.get_json(force=True)
mode = data.get("mode", "off")
if mode in ["off", "repeat_one", "repeat_all"]:
repeat_mode = mode
return jsonify({"repeat": repeat_mode})
@app.post("/skip")
def skip():
global skip_flag
with queue_lock:
skip_flag = True
return jsonify({"status": "skipped"})
@app.post("/stop")
def stop():
global stop_flag, queue, current_track
with queue_lock:
stop_flag = True
queue.clear()
current_track = None
return jsonify({"status": "stopping"})
# -----------------------------
# Startup
# -----------------------------
if __name__ == "__main__":
os.makedirs(MUSIC_DIR, exist_ok=True)
os.makedirs(STATION_ID_DIR, exist_ok=True)
discover_tracks()
start_pifm()
# play station ID at startup so the listener(s) know we're up
id_file = pick_station_id_file()
if id_file:
set_rds("RT HushPupPi Station ID")
stream_file(id_file, allow_skip=False)
write_silence(0.25)
threading.Thread(target=worker, daemon=True).start()
threading.Thread(target=watchdog, daemon=True).start()
# Reduce Flask logging noise in production
import logging
logging.getLogger("werkzeug").setLevel(logging.ERROR)
app.run(host="0.0.0.0", port=5001, debug=False)