You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
887 lines
28 KiB
887 lines
28 KiB
#!/usr/bin/env python3 |
|
import os |
|
import time |
|
from datetime import datetime, timezone |
|
import random |
|
import threading |
|
import psutil |
|
import subprocess |
|
import hashlib |
|
import sqlite3 |
|
from flask import Flask, jsonify, request, send_from_directory |
|
|
|
# ----------------------------- |
|
# Configuration |
|
# ----------------------------- |
|
MUSIC_DIR = "/media/usb0/music" |
|
STATION_ID_DIR = "/usr/local/share/music_api/station_ids" |
|
CACHE_DIR = os.path.join(MUSIC_DIR, ".cache") |
|
ALBUM_ART_CACHE_DIR = os.path.join(CACHE_DIR, "albumart") |
|
FREQ = "103.1" |
|
STATION_NAME = "HushPup" |
|
RDS_FIFO = "/tmp/pifm_rds" |
|
|
|
STATION_ID_INTERVAL = 600 # 10 minutes |
|
STATION_ID_SONG_THRESHOLD = 3 |
|
|
|
# Audio format for decoder/wrapper |
|
SAMPLE_RATE = 44100 |
|
CHANNELS = 2 |
|
SAMPLE_WIDTH = 2 # bytes per sample (16-bit) |
|
SILENCE_MS_AFTER_TRACK = 50 |
|
|
|
TRACKS = {} # key: relative path -> value: {"id","filename","path","duration","album_art"} |
|
DB_PATH = os.path.join(CACHE_DIR, "track_cache.sqlite3") |
|
_db_lock = threading.Lock() |
|
|
|
# ----------------------------- |
|
# State |
|
# ----------------------------- |
|
queue = [] |
|
current_track = None |
|
stop_flag = False |
|
skip_flag = False |
|
|
|
shuffle_mode = False |
|
repeat_mode = "off" # off, repeat_one, repeat_all |
|
|
|
pifm_proc = None |
|
rds_pipe = None |
|
wav_wrap = None |
|
last_station_id_time = time.time() |
|
songs_since_last_id = 0 |
|
|
|
queue_lock = threading.Lock() |
|
|
|
app = Flask(__name__) |
|
|
|
# ----------------------------- |
|
# Helpers |
|
# ----------------------------- |
|
def init_db(): |
|
os.makedirs(ALBUM_ART_CACHE_DIR, exist_ok=True) |
|
with _db_lock: |
|
conn = sqlite3.connect(DB_PATH, timeout=30, isolation_level=None) |
|
try: |
|
# WAL for concurrency and faster writes |
|
conn.execute("PRAGMA journal_mode=WAL;") |
|
conn.execute("PRAGMA synchronous=NORMAL;") |
|
conn.execute(""" |
|
CREATE TABLE IF NOT EXISTS tracks ( |
|
path TEXT PRIMARY KEY, |
|
mtime INTEGER, |
|
size INTEGER, |
|
duration REAL, |
|
album_art TEXT, |
|
sha1 TEXT, |
|
updated_at TEXT |
|
); |
|
""") |
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_mtime ON tracks(mtime);") |
|
finally: |
|
conn.close() |
|
|
|
def db_get_entry(path): |
|
with _db_lock: |
|
conn = sqlite3.connect(DB_PATH, timeout=30) |
|
try: |
|
cur = conn.execute("SELECT mtime,size,duration,album_art,sha1 FROM tracks WHERE path = ?", |
|
(path,)) |
|
row = cur.fetchone() |
|
if not row: |
|
return None |
|
return {"mtime": row[0], "size": row[1], "duration": row[2], |
|
"album_art": row[3], "sha1": row[4]} |
|
finally: |
|
conn.close() |
|
|
|
def db_upsert_entry(path, mtime, size, duration, album_art, sha1=None): |
|
now = datetime.now(timezone.utc).isoformat() |
|
with _db_lock: |
|
conn = sqlite3.connect(DB_PATH, timeout=30) |
|
try: |
|
conn.execute("BEGIN") |
|
conn.execute(""" |
|
INSERT INTO tracks(path,mtime,size,duration,album_art,sha1,updated_at) |
|
VALUES(?,?,?,?,?,?,?) |
|
ON CONFLICT(path) DO UPDATE SET |
|
mtime=excluded.mtime, |
|
size=excluded.size, |
|
duration=excluded.duration, |
|
album_art=excluded.album_art, |
|
sha1=excluded.sha1, |
|
updated_at=excluded.updated_at |
|
""", (path, mtime, size, duration, album_art, sha1, now)) |
|
conn.execute("COMMIT") |
|
except Exception: |
|
conn.execute("ROLLBACK") |
|
raise |
|
finally: |
|
conn.close() |
|
|
|
def db_delete_paths(paths): |
|
"""Delete multiple paths (iterable) in a single transaction.""" |
|
with _db_lock: |
|
conn = sqlite3.connect(DB_PATH, timeout=30) |
|
try: |
|
conn.execute("BEGIN") |
|
conn.executemany("DELETE FROM tracks WHERE path = ?", ((p,) for p in paths)) |
|
conn.execute("COMMIT") |
|
except Exception: |
|
conn.execute("ROLLBACK") |
|
raise |
|
finally: |
|
conn.close() |
|
|
|
def prune_stale_db_entries(valid_paths_set): |
|
""" |
|
Remove DB rows whose path is not in valid_paths_set. |
|
This keeps the DB clean when files are removed or renamed. |
|
""" |
|
# Collect all DB paths and compute difference |
|
with _db_lock: |
|
conn = sqlite3.connect(DB_PATH, timeout=30) |
|
try: |
|
cur = conn.execute("SELECT path FROM tracks") |
|
db_paths = {row[0] for row in cur.fetchall()} |
|
finally: |
|
conn.close() |
|
|
|
stale = db_paths - set(valid_paths_set) |
|
if not stale: |
|
return |
|
|
|
# Delete stale rows in batches |
|
batch = [] |
|
for p in stale: |
|
batch.append(p) |
|
if len(batch) >= 200: |
|
db_delete_paths(batch) |
|
batch = [] |
|
if batch: |
|
db_delete_paths(batch) |
|
|
|
app.logger.info("prune_stale_db_entries: removed %d stale entries", len(stale)) |
|
|
|
def open_fifo_nonblocking(path): |
|
fd = os.open(path, os.O_RDWR | os.O_NONBLOCK) |
|
return os.fdopen(fd, "w", buffering=1) |
|
|
|
def pick_station_id_file(): |
|
if not os.path.isdir(STATION_ID_DIR): |
|
return None |
|
files = [f for f in os.listdir(STATION_ID_DIR) if os.path.isfile(os.path.join(STATION_ID_DIR, f))] |
|
if not files: |
|
return None |
|
return os.path.join(STATION_ID_DIR, random.choice(files)) |
|
|
|
def pick_random_track(): |
|
"""Return a random music track (relative path) from TRACKS.""" |
|
if not TRACKS: |
|
return None |
|
return random.choice(list(TRACKS.keys())) |
|
|
|
def set_rds(text: str): |
|
global rds_pipe |
|
try: |
|
if rds_pipe: |
|
rds_pipe.write(text[:64] + "\n") |
|
rds_pipe.flush() |
|
except Exception: |
|
pass |
|
|
|
def _safe_hash_name(rel_path: str) -> str: |
|
return hashlib.sha1(rel_path.encode("utf-8")).hexdigest() + ".jpg" |
|
|
|
# ----------------------------- |
|
# Persistent WAV wrapper + pifmadv |
|
# ----------------------------- |
|
def start_wav_wrapper(): |
|
""" |
|
Start a persistent ffmpeg process that reads raw PCM from stdin and |
|
writes a WAV stream to stdout. The wrapper stdin remains open across tracks. |
|
""" |
|
global wav_wrap |
|
if wav_wrap and wav_wrap.poll() is None: |
|
return |
|
|
|
wrapper_args = [ |
|
"ffmpeg", |
|
"-f", "s16le", # raw PCM input |
|
"-ac", str(CHANNELS), |
|
"-ar", str(SAMPLE_RATE), |
|
"-i", "-", # stdin |
|
"-f", "wav", # output WAV stream |
|
"-ac", str(CHANNELS), |
|
"-ar", str(SAMPLE_RATE), |
|
"-" # stdout → pifmadv |
|
] |
|
|
|
wav_wrap = subprocess.Popen( |
|
wrapper_args, |
|
stdin=subprocess.PIPE, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.DEVNULL, |
|
bufsize=0 |
|
) |
|
|
|
def start_pifm(): |
|
""" |
|
Start the persistent wrapper and pifmadv. pifmadv reads from wrapper stdout. |
|
""" |
|
global pifm_proc, rds_pipe, wav_wrap |
|
|
|
if not os.path.exists(RDS_FIFO): |
|
try: |
|
os.mkfifo(RDS_FIFO) |
|
except FileExistsError: |
|
pass |
|
|
|
start_wav_wrapper() |
|
|
|
# Replace these args with your actual pifmadv invocation |
|
pifm_args = [ |
|
"pifmadv", |
|
"--freq", FREQ, |
|
"--pi", "beef", |
|
"--pty", "9", |
|
"--ps", STATION_NAME, |
|
"--ctl", RDS_FIFO, |
|
"--power", "7", |
|
"--preemph", "us", |
|
"--cutoff", "20500", |
|
"--audio", "-", |
|
] |
|
|
|
# Start pifmadv reading from wrapper stdout |
|
pifm_proc = subprocess.Popen( |
|
pifm_args, |
|
stdin=wav_wrap.stdout, |
|
stdout=subprocess.DEVNULL, |
|
stderr=subprocess.DEVNULL |
|
) |
|
|
|
# Close wrapper stdout in parent so pifm owns the read end |
|
try: |
|
wav_wrap.stdout.close() |
|
except Exception: |
|
pass |
|
|
|
time.sleep(0.1) |
|
rds_pipe = open_fifo_nonblocking(RDS_FIFO) |
|
|
|
def write_silence(seconds: float): |
|
"""Feed raw PCM silence into the WAV wrapper (non-blocking).""" |
|
global wav_wrap |
|
if not wav_wrap or wav_wrap.poll() is not None: |
|
return |
|
frames = int(SAMPLE_RATE * seconds) |
|
silence = (b"\x00" * SAMPLE_WIDTH * CHANNELS) * frames |
|
try: |
|
# Acquire a short lock to avoid concurrent writes from multiple threads |
|
with queue_lock: |
|
wav_wrap.stdin.write(silence) |
|
wav_wrap.stdin.flush() |
|
except BrokenPipeError: |
|
pass |
|
except Exception: |
|
pass |
|
|
|
def _safe_art_name(rel_path: str) -> str: |
|
"""Stable filesystem-safe name for album art based on relative path.""" |
|
h = hashlib.sha1(rel_path.encode("utf-8")).hexdigest() |
|
return f"{h}.jpg" |
|
|
|
def probe_duration(path: str, timeout: int = 6): |
|
"""Run ffprobe once to get duration in seconds or None on failure.""" |
|
try: |
|
result = subprocess.run( |
|
[ |
|
"ffprobe", "-v", "error", |
|
"-show_entries", "format=duration", |
|
"-of", "default=noprint_wrappers=1:nokey=1", |
|
path |
|
], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.DEVNULL, |
|
text=True, |
|
timeout=timeout |
|
) |
|
out = result.stdout.strip() |
|
if not out: |
|
return None |
|
return float(out) |
|
except Exception: |
|
return None |
|
|
|
def extract_album_art(path: str, rel_path: str, timeout: int = 10): |
|
""" |
|
Extract embedded album art to ALBUM_ART_CACHE_DIR and return cache filename, |
|
or None if no art or extraction failed. |
|
""" |
|
os.makedirs(ALBUM_ART_CACHE_DIR, exist_ok=True) |
|
art_name = _safe_art_name(rel_path) |
|
art_path = os.path.join(ALBUM_ART_CACHE_DIR, art_name) |
|
|
|
# If already cached, return immediately |
|
if os.path.exists(art_path) and os.path.getsize(art_path) > 0: |
|
return art_name |
|
|
|
# Try to extract the first video stream / attached pic |
|
# Use ffmpeg to write a jpeg; if none exists, ffmpeg will exit nonzero |
|
try: |
|
# -y overwrite, -v error quiets output |
|
subprocess.run( |
|
[ |
|
"ffmpeg", "-y", "-v", "error", |
|
"-i", path, |
|
"-an", "-vcodec", "copy", |
|
art_path |
|
], |
|
stdout=subprocess.DEVNULL, |
|
stderr=subprocess.DEVNULL, |
|
timeout=timeout, |
|
check=False |
|
) |
|
# If file exists and non-empty, return it |
|
if os.path.exists(art_path) and os.path.getsize(art_path) > 0: |
|
return art_name |
|
except Exception: |
|
pass |
|
|
|
# Cleanup any zero-length file |
|
try: |
|
if os.path.exists(art_path) and os.path.getsize(art_path) == 0: |
|
os.remove(art_path) |
|
except Exception: |
|
pass |
|
|
|
return None |
|
|
|
def display_title(path): |
|
"""Convert 'dir/a/b/Song - Title.ext' → 'Song - Title'.""" |
|
if not path: |
|
return None |
|
base = os.path.basename(path) |
|
title, _ = os.path.splitext(base) |
|
return title |
|
|
|
# ----------------------------- |
|
# Pump helper for decoder -> wrapper |
|
# ----------------------------- |
|
def _pump_decoder_to_wrapper(decoder_stdout, stop_event): |
|
""" |
|
Copy bytes from decoder_stdout to wrapper_proc.stdin until EOF or stop_event. |
|
Do NOT close wrapper_proc.stdin when done. |
|
""" |
|
global wav_wrap |
|
try: |
|
bufsize = 64 * 1024 |
|
while not stop_event.is_set(): |
|
chunk = decoder_stdout.read(bufsize) |
|
if not chunk: |
|
break |
|
try: |
|
with queue_lock: |
|
if wav_wrap and wav_wrap.poll() is None: |
|
wav_wrap.stdin.write(chunk) |
|
wav_wrap.stdin.flush() |
|
else: |
|
# wrapper died; stop pumping |
|
break |
|
except BrokenPipeError: |
|
break |
|
except Exception: |
|
break |
|
except Exception: |
|
pass |
|
finally: |
|
try: |
|
decoder_stdout.close() |
|
except Exception: |
|
pass |
|
|
|
def write_silence_ms(ms: int = SILENCE_MS_AFTER_TRACK): |
|
write_silence(ms / 1000.0) |
|
|
|
def is_zombie(proc): |
|
try: |
|
return proc.status() == psutil.STATUS_ZOMBIE |
|
except psutil.NoSuchProcess: |
|
return True |
|
|
|
def restart_radio_chain(): |
|
print("[WATCHDOG] Restarting radio chain") |
|
|
|
# Kill ffmpeg |
|
for proc in psutil.process_iter(['name', 'cmdline']): |
|
try: |
|
cmd = proc.info['cmdline'] or [] |
|
if proc.info['name'] == 'ffmpeg' or (cmd and 'ffmpeg' in cmd[0]): |
|
print(f"[WATCHDOG] Killing ffmpeg PID {proc.pid}") |
|
proc.kill() |
|
except Exception: |
|
pass |
|
|
|
# Kill pifmadv |
|
for proc in psutil.process_iter(['name', 'cmdline']): |
|
try: |
|
cmd = proc.info['cmdline'] or [] |
|
if proc.info['name'] == 'pifmadv' or (cmd and 'pifmadv' in cmd[0]): |
|
print(f"[WATCHDOG] Killing pifmadv PID {proc.pid}") |
|
proc.kill() |
|
except Exception: |
|
pass |
|
|
|
time.sleep(0.5) |
|
|
|
print("[WATCHDOG] Restarting pifmadv") |
|
start_pifm() |
|
|
|
# Your addition: broadcast station ID |
|
id_file = pick_station_id_file() |
|
if id_file: |
|
print(f"[WATCHDOG] Broadcasting recovery ID: {id_file}") |
|
stream_file(id_file, allow_skip=False) |
|
write_silence(0.25) |
|
|
|
# ----------------------------- |
|
# Robust stream_file implementation |
|
# ----------------------------- |
|
def stream_file(path: str, allow_skip=True) -> bool: |
|
""" |
|
Decode a file to RAW PCM (s16le, stereo, SAMPLE_RATE) |
|
and pump into the persistent WAV wrapper stdin. |
|
Returns True if decoder exited with code 0 (normal EOF). |
|
Returns False if aborted (skip/stop) or decoder error. |
|
""" |
|
global wav_wrap, stop_flag, skip_flag |
|
|
|
if not wav_wrap or wav_wrap.poll() is not None: |
|
print("wav_wrap is not running") |
|
return False |
|
|
|
# Build decoder args to emit raw PCM matching wrapper input |
|
decoder_args = [ |
|
"ffmpeg", "-nostdin", "-v", "error", |
|
"-i", path, |
|
"-f", "s16le", # raw PCM |
|
"-af", "acompressor=threshold=-18dB:ratio=4:attack=5:release=100,alimiter=limit=0.0dB", # compress just like a real radio station, because this is a real radio station |
|
"-ar", str(SAMPLE_RATE), |
|
"-ac", str(CHANNELS), |
|
"pipe:1" |
|
] |
|
|
|
try: |
|
decoder = subprocess.Popen( |
|
decoder_args, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.DEVNULL, |
|
stdin=subprocess.DEVNULL, |
|
bufsize=0 |
|
) |
|
except Exception as e: |
|
print("[stream_file] failed to start decoder:", e) |
|
return False |
|
|
|
stop_event = threading.Event() |
|
pump_thread = threading.Thread(target=_pump_decoder_to_wrapper, args=(decoder.stdout, stop_event), daemon=True) |
|
pump_thread.start() |
|
|
|
try: |
|
while True: |
|
ret = decoder.poll() |
|
if ret is not None: |
|
# Decoder exited; let pump flush briefly |
|
pump_thread.join(timeout=1) |
|
write_silence_ms(50) |
|
return ret == 0 |
|
|
|
# Abort on global stop |
|
if stop_flag: |
|
try: |
|
decoder.terminate() |
|
except Exception: |
|
pass |
|
stop_event.set() |
|
pump_thread.join(timeout=1) |
|
write_silence_ms() |
|
return False |
|
|
|
# Abort on skip |
|
if allow_skip and skip_flag: |
|
try: |
|
decoder.terminate() |
|
except Exception: |
|
pass |
|
stop_event.set() |
|
pump_thread.join(timeout=1) |
|
# Do not clear skip_flag here; worker will clear under lock |
|
write_silence_ms() |
|
return False |
|
|
|
time.sleep(0.05) |
|
|
|
except Exception as e: |
|
print("[stream_file] Exception:", e) |
|
try: |
|
decoder.kill() |
|
except Exception: |
|
pass |
|
stop_event.set() |
|
pump_thread.join(timeout=1) |
|
write_silence_ms() |
|
return False |
|
|
|
# ----------------------------- |
|
# Discover tracks (at startup) |
|
# ----------------------------- |
|
def discover_tracks(use_hash=False, save_every_n=200): |
|
""" |
|
Walk MUSIC_DIR, reuse DB rows when mtime+size match. |
|
Probe/extract only for new/changed files. Save progress periodically. |
|
""" |
|
init_db() |
|
TRACKS.clear() |
|
seen = set() |
|
count = 0 |
|
|
|
for root, dirs, files in os.walk(MUSIC_DIR): |
|
for name in files: |
|
if not name.lower().endswith((".mp3", ".flac", ".wav", ".m4a", ".ogg", ".aac")): |
|
continue |
|
|
|
full = os.path.join(root, name) |
|
rel = os.path.relpath(full, MUSIC_DIR) |
|
seen.add(rel) |
|
|
|
# stat |
|
try: |
|
st = os.stat(full) |
|
mtime = int(st.st_mtime) |
|
size = st.st_size |
|
except Exception: |
|
mtime, size = None, None |
|
|
|
cached = db_get_entry(rel) |
|
reuse = False |
|
duration = None |
|
art = None |
|
sha = None |
|
|
|
if cached and cached.get("mtime") == mtime and cached.get("size") == size: |
|
duration = cached.get("duration") |
|
art = cached.get("album_art") |
|
sha = cached.get("sha1") |
|
reuse = True |
|
elif cached and use_hash: |
|
# compute sha1 only when mtime/size differ and use_hash requested |
|
sha_now = compute_sha1(full) |
|
if sha_now and sha_now == cached.get("sha1"): |
|
duration = cached.get("duration") |
|
art = cached.get("album_art") |
|
# update mtime/size to current values |
|
db_upsert_entry(rel, mtime, size, duration, art, sha_now) |
|
reuse = True |
|
|
|
if not reuse: |
|
duration = probe_duration(full) |
|
art = extract_album_art(full, rel) |
|
sha = compute_sha1(full) if use_hash else None |
|
db_upsert_entry(rel, mtime, size, duration, art, sha) |
|
|
|
TRACKS[rel] = { |
|
"id": rel, |
|
"filename": name, |
|
"path": rel, |
|
"duration": duration, |
|
"album_art": art |
|
} |
|
|
|
count += 1 |
|
if save_every_n and count % save_every_n == 0: |
|
# small progress log |
|
app.logger.info("discover_tracks: processed %d files", count) |
|
|
|
# prune stale DB rows (entries not seen in this scan) |
|
prune_stale_db_entries(seen) |
|
|
|
app.logger.info("discover_tracks: finished, total=%d", len(TRACKS)) |
|
|
|
# ----------------------------- |
|
# Playback Worker |
|
# ----------------------------- |
|
def worker(): |
|
global current_track, stop_flag, skip_flag, last_station_id_time, songs_since_last_id |
|
global queue, shuffle_mode, repeat_mode |
|
|
|
while True: |
|
with queue_lock: |
|
if not queue: |
|
current_track = None |
|
empty = True |
|
else: |
|
empty = False |
|
|
|
if empty: |
|
track = pick_random_track() |
|
if track: |
|
print(f"[WORKER] Queue empty — adding random track: {track}") |
|
with queue_lock: |
|
queue.append(track) |
|
stop_flag = False |
|
try: |
|
if stop_flag: |
|
with queue_lock: |
|
current_track = None |
|
queue.clear() |
|
write_silence(0.25) |
|
time.sleep(0.5) |
|
continue |
|
|
|
# Station ID condition: whichever comes first (time or songs) |
|
now = time.time() |
|
with queue_lock: |
|
time_elapsed = now - last_station_id_time |
|
songs_count = songs_since_last_id |
|
|
|
if time_elapsed >= STATION_ID_INTERVAL or songs_count >= STATION_ID_SONG_THRESHOLD: |
|
id_file = pick_station_id_file() |
|
if id_file: |
|
with queue_lock: |
|
current_track = None |
|
songs_since_last_id = 0 |
|
set_rds("RT HushPupPi Station ID") |
|
stream_file(id_file, allow_skip=False) |
|
write_silence(0.25) |
|
# Reset the last_station_id_time after playing |
|
last_station_id_time = time.time() |
|
else: |
|
# No station id files; reset timer so we don't spin |
|
last_station_id_time = time.time() |
|
|
|
# Shuffle only when building the play order |
|
if shuffle_mode: |
|
with queue_lock: |
|
random.shuffle(queue) |
|
|
|
# Repeat_one: loop the first track |
|
if repeat_mode == "repeat_one": |
|
with queue_lock: |
|
current_track = queue[0] if queue else None |
|
path_current = os.path.join(MUSIC_DIR, current_track) if current_track else None |
|
set_rds(f"RT {current_track}") |
|
played_ok = stream_file(path_current) if path_current else False |
|
write_silence(0.25) |
|
with queue_lock: |
|
current_track = None |
|
time.sleep(0.1) |
|
continue |
|
|
|
# Normal / repeat_all |
|
with queue_lock: |
|
current_track = queue[0] if queue else None |
|
path_current = os.path.join(MUSIC_DIR, current_track) if current_track else None |
|
set_rds(f"RT {current_track}") |
|
|
|
print(f"[WORKER] Starting: {current_track} | queue={queue} | skip={skip_flag} stop={stop_flag}") |
|
|
|
played_ok = stream_file(path_current) if path_current else False |
|
|
|
# Snapshot flags under lock to avoid races |
|
with queue_lock: |
|
local_skip = skip_flag |
|
local_stop = stop_flag |
|
head = queue[0] if queue else None |
|
|
|
print(f"[WORKER] Finished stream_file: {current_track} played_ok={played_ok} skip={local_skip} stop={local_stop}") |
|
|
|
if local_stop: |
|
# stop_flag handled at top of loop |
|
continue |
|
|
|
if played_ok: |
|
# normal completion: remove head only if it matches |
|
with queue_lock: |
|
if queue and queue[0] == head: |
|
popped = queue.pop(0) |
|
print(f"[WORKER] popped after play: {popped} | queue now {queue}") |
|
if repeat_mode == "repeat_all": |
|
queue.append(popped) |
|
else: |
|
# not played_ok: if it was a skip, clear skip and pop once |
|
if local_skip: |
|
with queue_lock: |
|
skip_flag = False |
|
if queue and queue[0] == head: |
|
popped = queue.pop(0) |
|
print(f"[WORKER] Skip: popped {popped} | queue now {queue}") |
|
write_silence(0.25) |
|
continue |
|
if not current_track: |
|
# there was no song to play |
|
write_silence(1) |
|
time.sleep(1) |
|
stop_flag = True |
|
continue |
|
# Real failure: log and drop the track to avoid infinite loop |
|
print("[WORKER] Track failed to play:", current_track) |
|
with queue_lock: |
|
if queue and queue[0] == head: |
|
dropped = queue.pop(0) |
|
print(f"[WORKER] Dropped failed track: {dropped}") |
|
write_silence(0.25) |
|
time.sleep(0.5) |
|
continue |
|
|
|
with queue_lock: |
|
current_track = None |
|
|
|
time.sleep(0.1) |
|
|
|
except Exception as e: |
|
print("[WORKER] Exception in worker loop:", e) |
|
time.sleep(1) |
|
|
|
restart_lock = threading.Lock() |
|
|
|
def watchdog(): |
|
while True: |
|
time.sleep(5) |
|
|
|
pifm_state = "missing" |
|
ffmpeg_state = "missing" |
|
|
|
for proc in psutil.process_iter(['name', 'cmdline', 'status']): |
|
try: |
|
cmd = proc.info['cmdline'] or [] |
|
name = proc.info['name'] |
|
|
|
if name == "pifmadv" or (cmd and "pifmadv" in cmd[0]): |
|
if proc.status() == psutil.STATUS_ZOMBIE: |
|
pifm_state = "zombie" |
|
else: |
|
pifm_state = "ok" |
|
|
|
if name == "ffmpeg" or (cmd and "ffmpeg" in cmd[0]): |
|
# Only count ffmpeg processes that belong to your pipeline |
|
if any(x in cmd for x in ["-f", "wav", "-i", "-"]): |
|
if proc.status() == psutil.STATUS_ZOMBIE: |
|
ffmpeg_state = "zombie" |
|
else: |
|
ffmpeg_state = "ok" |
|
|
|
except Exception: |
|
continue |
|
|
|
if pifm_state != "ok" or ffmpeg_state != "ok": |
|
with restart_lock: |
|
print(f"[WATCHDOG] pifmadv={pifm_state}, ffmpeg={ffmpeg_state}") |
|
restart_radio_chain() |
|
|
|
# ----------------------------- |
|
# API Endpoints |
|
# ----------------------------- |
|
@app.get("/tracks") |
|
def list_tracks(): |
|
# Return precomputed metadata as a sorted list |
|
items = list(TRACKS.values()) |
|
items.sort(key=lambda x: x["path"]) |
|
return jsonify(items) |
|
|
|
@app.get("/album_art/<name>") |
|
def album_art(name): |
|
# Serve cached album art files |
|
safe = os.path.join(ALBUM_ART_CACHE_DIR, name) |
|
if not os.path.exists(safe): |
|
return ("", 404) |
|
return send_from_directory(ALBUM_ART_CACHE_DIR, name) |
|
|
|
@app.get("/queue") |
|
def get_queue(): |
|
with queue_lock: |
|
return jsonify([display_title(item) for item in queue]) |
|
|
|
@app.post("/queue") |
|
def add_to_queue(): |
|
global queue, stop_flag |
|
data = request.get_json(force=True) |
|
track = data.get("track") |
|
if not track: |
|
return jsonify({"error": "no track"}), 400 |
|
|
|
# Validate against discovered tracks |
|
if track not in TRACKS: |
|
return jsonify({"error": "unknown track"}), 404 |
|
|
|
with queue_lock: |
|
queue.append(track) |
|
stop_flag = False |
|
return jsonify({"queued": track}) |
|
|
|
@app.get("/now") |
|
def now_playing(): |
|
with queue_lock: |
|
art = None |
|
if current_track and current_track in TRACKS: |
|
art_name = TRACKS[current_track].get("album_art") |
|
if art_name: |
|
art = f"/album_art/{art_name}" |
|
return jsonify({"track": display_title(current_track), "album_art": art}) |
|
|
|
@app.post("/shuffle") |
|
def set_shuffle(): |
|
global shuffle_mode |
|
data = request.get_json(force=True) |
|
shuffle_mode = bool(data.get("enabled", False)) |
|
return jsonify({"shuffle": shuffle_mode}) |
|
|
|
@app.post("/repeat") |
|
def set_repeat(): |
|
global repeat_mode |
|
data = request.get_json(force=True) |
|
mode = data.get("mode", "off") |
|
if mode in ["off", "repeat_one", "repeat_all"]: |
|
repeat_mode = mode |
|
return jsonify({"repeat": repeat_mode}) |
|
|
|
@app.post("/skip") |
|
def skip(): |
|
global skip_flag |
|
with queue_lock: |
|
skip_flag = True |
|
return jsonify({"status": "skipped"}) |
|
|
|
@app.post("/stop") |
|
def stop(): |
|
global stop_flag, queue, current_track |
|
with queue_lock: |
|
stop_flag = True |
|
queue.clear() |
|
current_track = None |
|
return jsonify({"status": "stopping"}) |
|
|
|
# ----------------------------- |
|
# Startup |
|
# ----------------------------- |
|
if __name__ == "__main__": |
|
os.makedirs(MUSIC_DIR, exist_ok=True) |
|
os.makedirs(STATION_ID_DIR, exist_ok=True) |
|
|
|
discover_tracks() |
|
start_pifm() |
|
|
|
# play station ID at startup so the listener(s) know we're up |
|
id_file = pick_station_id_file() |
|
if id_file: |
|
set_rds("RT HushPupPi Station ID") |
|
stream_file(id_file, allow_skip=False) |
|
write_silence(0.25) |
|
|
|
threading.Thread(target=worker, daemon=True).start() |
|
threading.Thread(target=watchdog, daemon=True).start() |
|
# Reduce Flask logging noise in production |
|
import logging |
|
logging.getLogger("werkzeug").setLevel(logging.ERROR) |
|
app.run(host="0.0.0.0", port=5001, debug=False) |
|
|
|
|