@melomane13 : Please try to use the code tag for code listing. Especially for python where indentation is crucial 
# ====================== REMOTE DETECTION ======================
print("Searching for the remote...")
while True:
for path in evdev.list_devices():
device = evdev.InputDevice(path)
if device.name == REMOTE_NAME:
print(f"'{REMOTE_NAME}' found at {path}.") # Exemple : /dev/input/event2
remote_device = device
break
else:
print(f"'{REMOTE_NAME}' not found. Retrying...")
time.sleep(2)
continue
break
remote = evdev.InputDevice(path)
knob = evdev.InputDevice("/dev/input/event0") # Assign rotary knob (assumed fixed at event0)
import asyncio
import glob
import os
import json
import signal
import socket
import sys
import time
import math
import evdev
import lgpio
from tm1637_lgpio import TM1637
from camilladsp import CamillaClient
# ========================= CONSTANTS =========================
REMOTE_NAME = "HID Remote01 Keyboard" # Remote name found using `python3 -m evdev.evtest`
POWER_OFF_DELAY = 2 # Delay (in minutes) before trigger power-off when no sound is detected
HALT_DELAY = 24 # Delay (in hours) before HALT raspberry when no sound is detected
POWER_GPIO = 4 # GPIO pin controlling the power relay
CLK, DIO = 23, 24 # Clock and Data pins for the TM1637 display
# Mapping of remote control keys to their functions
KEY_BINDINGS = {
'VOLUMEDOWN': 'KEY_VOLUMEDOWN', # Decrease volume
'VOLUMEUP': 'KEY_VOLUMEUP', # Increase volume
'MUTE': 'KEY_MUTE', # Mute/unmute audio
'PLAYPAUSE': 'KEY_PLAYPAUSE', # Play/pause music (LMS command)
'PREVIOUSSONG': 'KEY_PREVIOUSSONG', # Play previous song (LMS command)
'NEXTSONG': 'KEY_NEXTSONG', # Play next song (LMS command)
'UP': 'KEY_UP', # Increase treble
'DOWN': 'KEY_DOWN', # Decrease treble
'LEFT': 'KEY_LEFT', # Decrease bass
'RIGHT': 'KEY_RIGHT', # Increase bass
'POWER': 'KEY_POWER', # Display brightness,toggle Auto power, Shutdown
'ENTER': 'KEY_ENTER', # Toggle tone gain settings / Toogle display relative volume on long press
'BACK': 'KEY_BACK', # Switch to the next DSP configuration (prefixed with "_")
'HOMEPAGE': 'KEY_HOMEPAGE', # Switch to the next DSP configuration (prefixed with "|")
}
CONFIG_DIR = os.path.expanduser("~") + "/camilladsp/configs/" # Path to DSP configuration files
# ====================== GLOBAL VARIABLES ======================
key_hold_counter = 0
auto_power_enabled = True
is_waiting_for_sound = False
not_volume_displayed = False
is_applying_config = False
pending_config_update = False
is_key_held = False
is_volume_key_held = False
display_refresh_event = asyncio.Event()
# ====================== CONFIGURATION ======================
# Remote detection
print("Searching for the remote...")
TIMEOUT_SECONDS = 86400 # 24h
counter = 0
while True:
for path in evdev.list_devices():
device = evdev.InputDevice(path)
if device.name == REMOTE_NAME:
print(f"'{REMOTE_NAME}' found at {path}.")
remote = device
remote.grab()
break
else:
counter += 1
if counter >= TIMEOUT_SECONDS:
print(f"'{REMOTE_NAME}' not found after {TIMEOUT_SECONDS} secondes. Shutting down...")
os.system("sudo shutdown -h now")
break
print(f"'{REMOTE_NAME}' not found. Retrying...")
time.sleep(1)
continue
break
# Assign rotary knob (assumed at event0)
knob = evdev.InputDevice("/dev/input/event0")
# Connect to CamillaDSP
cdsp = CamillaClient("127.0.0.1", 1234)
cdsp.connect()
config_active = cdsp.config.active()
# GPIO setup
h = lgpio.gpiochip_open(0) # Open the GPIO chip
lgpio.gpio_claim_output(h, POWER_GPIO) # Configure GPIO as output
lgpio.gpio_write(h, POWER_GPIO, 1) # Set initial state to HIGH
# TM1637 Display Configuration
tm = TM1637(clk=CLK, dio=DIO)
tm.write(tm.encode_string(" " *6))
tm.brightness(1)
# ====================== FONCTIONS =============================
def swap(segs):
length = len(segs)
if length == 4 or length == 5:
segs.extend(bytearray([0] * (6 - length)))
segs[0], segs[2] = segs[2], segs[0]
if length >= 4:
segs[3], segs[5] = segs[5], segs[3]
return segs
def change_volume_from_key(cdsp, key, current_volume, is_muted=None, volume_step=1):
volume_change = -volume_step if key == KEY_BINDINGS['VOLUMEDOWN'] else volume_step
new_volume = max(-100, min(0, current_volume + volume_change))
if new_volume != current_volume:
cdsp.volume.set_main_volume(new_volume)
display_volume_info(new_volume)
def get_bass_treble(config, mode="gain"):
try:
filters = config.get('filters', {})
bass = filters['Bass']['parameters']
treble = filters['Treble']['parameters']
if mode == "gain":
return bass['gain'], treble['gain']
elif mode == "parameters":
return bass, treble
else:
raise ValueError("Mode invalide : utiliser 'gain' ou 'parameters'")
except (KeyError, TypeError):
return None, None
def display_volume_info(current_volume=None, is_muted=None):
global not_volume_displayed, blank_volume_when_mute
if current_volume is None:
current_volume = cdsp.volume.main_volume()
if is_muted is None:
is_muted = cdsp.volume.main_mute()
config_path = cdsp.config.file_path().replace(CONFIG_DIR, '')[1:4]
vol = round(current_volume)
display_vol = max(-99, vol)
width = len(str(abs(display_vol)))
if display_vol == 0:
blanks = ' -'
else:
blanks = (' --', '---')[min(width, 2) - 1]
if is_muted and not is_volume_key_held:
blank_volume_when_mute = True
volume_str = f"{config_path}{blanks}"
else:
blank_volume_when_mute = False
volume_str = f"{config_path}{display_vol:3}"
tm.write(swap(tm.encode_string(volume_str)))
not_volume_displayed = False
def display_tone_info():
global not_volume_displayed
bass_gain, treble_gain = get_bass_treble(config_active, mode="gain")
if bass_gain is not None and treble_gain is not None:
bass_gain = round(bass_gain)
treble_gain = round(treble_gain)
tm.write(swap(tm.encode_string(f"{bass_gain:2}B{treble_gain:2}T")))
not_volume_displayed = True
def send_lms_command(command):
"""Retrieve the MAC address of the default network interface, format it,
and send a command to LMS using the MAC address as an identifier."""
try:
# Get the default network interface
with os.popen("ip route show default") as route_info:
# Extract the interface name from the routing table entry
default_iface = next((line.split()[4] for line in route_info if "default" in line), None)
# Raise an exception if no default network interface is found
if not default_iface:
raise ValueError("No default network interface found")
# Read the MAC address of the interface from the system file
mac_path = f"/sys/class/net/{default_iface}/address"
with open(mac_path) as f:
mac = f.read().strip()
# Format the MAC address to be URL-encoded (%3A instead of ':')
mac = mac.replace(":", "%3A").lower()
# Get the hostname of the device
host = socket.gethostname()
port = 9090
# Create a socket connection to LMS and send the command
with socket.create_connection((host, port)) as sock:
sock.sendall(f"{mac} {command}\r\nexit\r\n".encode("utf-8"))
response = sock.recv(4096).decode("utf-8") # Read the response
return response.strip()
except Exception as e:
# Print error if any exception occurs
print(f"Error: {e}")
return None
def get_repeat_speed(volume, direction, exponent=2.0, pivot=-50):
volume = max(-100, min(0, volume))
min_delay = 0.01
max_delay = 0.5
if direction == "up":
if volume >= pivot:
t = (volume - pivot) / (0 - pivot)
else:
t = 0
factor = math.pow(t, exponent)
return min_delay + factor * (max_delay - min_delay)
elif direction == "down":
if volume <= pivot:
t = (pivot - volume) / (pivot - (-100))
else:
t = 0
factor = math.pow(t, exponent)
return min_delay + factor * (max_delay - min_delay)
async def change_config(cdsp, config_pattern):
global config_active
bass_gain, treble_gain = get_bass_treble(config_active, mode="gain")
config_files = glob.glob(config_pattern)
current_config = cdsp.config.file_path()
if not config_files:
return
try:
index = config_files.index(current_config)
new_config_path = config_files[(index + 1) % len(config_files)]
except ValueError:
new_config_path = config_files[0]
if new_config_path == current_config:
return
new_config_data = cdsp.config.read_and_parse_file(new_config_path)
filters = new_config_data.get('filters', {})
if isinstance(filters, dict):
if bass_gain is not None:
try:
filters['Bass']['parameters']['gain'] = bass_gain
except KeyError:
pass
if treble_gain is not None:
try:
filters['Treble']['parameters']['gain'] = treble_gain
except KeyError:
pass
# Normalize relative paths
config_dir = os.path.dirname(os.path.abspath(new_config_path))
for name, filt in filters.items():
params = filt.get("parameters", {})
filename = params.get("filename")
if filename and isinstance(filename, str) and not os.path.isabs(filename):
abs_path = os.path.normpath(os.path.join(config_dir, filename))
filters[name]["parameters"]["filename"] = abs_path
cdsp.config.set_active(new_config_data)
cdsp.config.set_file_path(new_config_path)
config_active = new_config_data
display_volume_info()
async def handle_gain_change_async(key, cdsp, tm):
global is_applying_config, pending_config_update, not_volume_displayed, config_active
key_map = {
KEY_BINDINGS['UP']: ("Treble", 1),
KEY_BINDINGS['DOWN']: ("Treble", -1),
KEY_BINDINGS['RIGHT']: ("Bass", 1),
KEY_BINDINGS['LEFT']: ("Bass", -1),
}
if not (param_delta := key_map.get(key)) or (param := param_delta[0]) not in (filters := config_active.get("filters", {})): return
delta = param_delta[1]
if not not_volume_displayed:
display_tone_info()
not_volume_displayed = True
return
gain = filters[param]["parameters"].get("gain", 0)
new_gain = max(-6, min(6, gain + delta))
if new_gain == gain:
return
filters[param]["parameters"]["gain"] = new_gain
# Display
bass_gain = round(filters['Bass']['parameters'].get('gain', 0))
treble_gain = round(filters['Treble']['parameters'].get('gain', 0))
gain_format = f"{bass_gain:2}B{treble_gain:2}T"
encoded = tm.encode_string(gain_format)
swapped = swap(encoded)
tm.write(swapped)
not_volume_displayed = False
# Trigger update if needed
if not is_applying_config:
asyncio.create_task(apply_config_async(cdsp))
else:
pending_config_update = True
async def apply_config_async(cdsp):
global is_applying_config, pending_config_update, not_volume_displayed
is_applying_config = True
while True:
pending_config_update = False
cdsp.config.set_active(config_active)
not_volume_displayed = True
await asyncio.sleep(0.05) # minimal delay to accumulate keypresses
if not pending_config_update:
break # no new modification → stop
is_applying_config = False
async def display_manager_loop():
global not_volume_displayed, key_hold_counter, is_key_held, is_volume_key_held
idleDisplayCounter = 0
while True:
if is_volume_key_held:
is_volume_key_held = False
await asyncio.sleep(0.9)
display_volume_info()
if not_volume_displayed:
key_hold_counter += 1
if key_hold_counter == 5:
key_hold_counter = 0
display_volume_info()
if is_key_held:
idleDisplayCounter = 0
is_key_held = False
if "-1000.0" in str(cdsp.levels.capture_rms()):
idleDisplayCounter +=1
if idleDisplayCounter == ( POWER_OFF_DELAY*60 ) + 5 :
idleDisplayCounter = 0
tm.write(tm.encode_string(" " *6)) # Clear the display
try:
await asyncio.wait_for(display_refresh_event.wait(), timeout=1.0)
display_refresh_event.clear()
except asyncio.TimeoutError:
pass
async def toggle_power():
global auto_power_enabled, is_waiting_for_sound
if not auto_power_enabled:
# If the relay is off, activate it and re-enable auto shutdown
auto_power_enabled = True
lgpio.gpio_write(h, POWER_GPIO, lgpio.HIGH) # Activate the relay
tm.write(swap(tm.encode_string("PW ON"))) # Display "POWER ON"
is_waiting_for_sound = False # No need to wait for sound anymore
else:
# If auto_power_enabled, check if the relay was turned off by auto shutdown
if is_waiting_for_sound:
# If waiting for sound to turn back on, force relay activation
lgpio.gpio_write(h, POWER_GPIO, lgpio.HIGH) # Activate the relay
tm.write(swap(tm.encode_string("PW ON"))) # Display "POWER ON"
is_waiting_for_sound = False # Reset sound waiting state
else:
# Otherwise, deactivate the relay normally
auto_power_enabled = False
lgpio.gpio_write(h, POWER_GPIO, lgpio.LOW) # Deactivate the relay
tm.write(swap(tm.encode_string("PW OFF"))) # Display "POWER OFF"
await asyncio.sleep(3)
display_volume_info() # if auto_power_enabled else tm.write(tm.encode_string(" "))
async def auto_poweroff():
global auto_power_enabled, is_waiting_for_sound
silenceCounter = 0
last_power_relay_state = lgpio.gpio_read(h, POWER_GPIO)
while True:
await asyncio.sleep(1)
if "-1000.0" in str(cdsp.levels.capture_rms()): # No sound detected
silenceCounter += 1
if silenceCounter == POWER_OFF_DELAY*60:
if not is_waiting_for_sound:
# Auto power-off: deactivate the relay after the delay without sound
lgpio.gpio_write(h, POWER_GPIO, lgpio.LOW)
tm.write(swap(tm.encode_string("PW OFF")))
last_power_relay_state = lgpio.gpio_read(h, POWER_GPIO) # Read the actual state
is_waiting_for_sound = True # Wait for sound or Power button press to reactivate
if silenceCounter >= int(HALT_DELAY * 3600):
tm.write(swap(tm.encode_string(" HALT ")))
os.system("sudo shutdown -h now")
break
else: # Sound detected
silenceCounter = 0
if auto_power_enabled and last_power_relay_state == 0:
# If auto_power_enabled and sound is detected, reactivate the relay
if is_waiting_for_sound:
lgpio.gpio_write(h, POWER_GPIO, lgpio.HIGH)
display_volume_info()
last_power_relay_state = lgpio.gpio_read(h, POWER_GPIO)
is_waiting_for_sound = False # Reset the wait-for-sound flag
elif not auto_power_enabled and last_power_relay_state == 1:
# Always turn off the relay if not auto_power_enabled
lgpio.gpio_write(h, POWER_GPIO, lgpio.LOW)
last_power_relay_state = lgpio.gpio_read(h, POWER_GPIO)
async def remote_events(device):
global not_volume_displayed, key_hold_counter, is_key_held, is_volume_key_held
bass_gain_prev = treble_gain_prev = repeat_throttle = br_direction = last_repeat_time = 0
while True:
try:
# Process events asynchronously from the device
async for event in device.async_read_loop():
if event.type == evdev.ecodes.EV_KEY:
attrib = evdev.categorize(event)
key = attrib.keycode
if attrib.keystate == 1: # Key pressed
is_key_held = True
if key in (KEY_BINDINGS['VOLUMEDOWN'], KEY_BINDINGS['VOLUMEUP']):
is_volume_key_held = True
current_volume = cdsp.volume.main_volume()
is_muted = cdsp.volume.main_mute()
if not_volume_displayed or blank_volume_when_mute:
display_volume_info(current_volume, is_muted)
else:
change_volume_from_key(cdsp, key, current_volume, is_muted)
elif key == KEY_BINDINGS['PREVIOUSSONG']:
# Command to play the previous song
send_lms_command("playlist index -1")
elif key == KEY_BINDINGS['NEXTSONG']:
# Command to play the next song
send_lms_command("playlist index +1")
elif key in (
KEY_BINDINGS['UP'],
KEY_BINDINGS['DOWN'],
KEY_BINDINGS['RIGHT'],
KEY_BINDINGS['LEFT']
):
asyncio.create_task(handle_gain_change_async(key, cdsp, tm))
elif key == KEY_BINDINGS['BACK']:
await change_config(cdsp, CONFIG_DIR + '_*')
elif key == KEY_BINDINGS['HOMEPAGE']:
await change_config(cdsp, CONFIG_DIR + '|*')
elif key == KEY_BINDINGS['ENTER']:
if not_volume_displayed:
bass_params, treble_params = get_bass_treble(config_active, mode="parameters")
if bass_params is not None and treble_params is not None:
if bass_params['gain'] == treble_params['gain'] == 0:
bass_params['gain'], treble_params['gain'] = bass_gain_prev, treble_gain_prev
bass_gain_now, treble_gain_now = bass_gain_prev, treble_gain_prev
bass_gain_prev, treble_gain_prev = 0, 0
else:
bass_gain_prev, treble_gain_prev = bass_params['gain'], treble_params['gain']
bass_params['gain'] = treble_params['gain'] = 0
bass_gain_now = treble_gain_now = 0
cdsp.config.set_active(config_active)
display_tone_info()
else:
display_tone_info()
if attrib.keystate == 2: # Key held down
if key in (KEY_BINDINGS['VOLUMEDOWN'], KEY_BINDINGS['VOLUMEUP']):
is_volume_key_held = True
current_time = time.time()
current_volume = cdsp.volume.main_volume()
is_muted = cdsp.volume.main_mute()
if key == KEY_BINDINGS['VOLUMEUP']:
direction = "up"
exponent = 2.0
pivot=-60
else:
direction = "down"
exponent = 2.0
pivot=-40
repeat_speed = get_repeat_speed(current_volume, direction, exponent=exponent, pivot=pivot)
if current_time - last_repeat_time >= repeat_speed:
step = 1
change_volume_from_key(cdsp, key, current_volume, is_muted, volume_step=step)
last_repeat_time = current_time
elif key == KEY_BINDINGS['PLAYPAUSE']:
# Stop LMS
key_hold_counter += 1
if key_hold_counter == 10:
send_lms_command("stop")
elif key == KEY_BINDINGS['POWER']:
key_hold_counter += 1
if key_hold_counter == 10:
await toggle_power()
elif key_hold_counter == 40:
tm.write(swap(tm.encode_string(" HALT ")))
os.system("sudo shutdown -h now")
elif key in (
KEY_BINDINGS['UP'],
KEY_BINDINGS['DOWN'],
KEY_BINDINGS['RIGHT'],
KEY_BINDINGS['LEFT']
):
repeat_throttle += 1
if repeat_throttle >= 4:
asyncio.create_task(handle_gain_change_async(key, cdsp, tm))
repeat_throttle = 0
if attrib.keystate == 0: # Key released
if KEY_BINDINGS['MUTE'] in key:
# Toggle mute status
if key_hold_counter < 20:
is_muted = not cdsp.volume.main_mute()
cdsp.volume.set_main_mute(is_muted)
current_volume = cdsp.volume.main_volume()
display_volume_info(current_volume, is_muted)
elif key == KEY_BINDINGS['PLAYPAUSE'] and key_hold_counter < 10:
send_lms_command("pause")
elif key == KEY_BINDINGS['POWER'] and key_hold_counter < 10:
# Adjust screen brightness on POWER key release
if br_direction == 1:
if tm.brightness() < 2:
tm.brightness(tm.brightness() + 1)
else:
tm.brightness(tm.brightness() - 1)
br_direction = -1
else:
if tm.brightness() > 0:
tm.brightness(tm.brightness() - 1)
else:
tm.brightness(tm.brightness() + 1)
br_direction = 1
elif key in (KEY_BINDINGS['VOLUMEDOWN'], KEY_BINDINGS['VOLUMEUP']):
display_refresh_event.set()
key_hold_counter = 0
repeat_throttle = 0
last_repeat_time = 0
except OSError as e:
print(f"❌ Critical error: {e}. Restarting the script...")
python = sys.executable
os.execv(python, [python] + sys.argv) # Replaces the current process
def exit_gracefully(signal, frame):
"""Gracefully shuts down the service, including cleanup of GPIO resources."""
try:
lgpio.gpio_write(h, POWER_GPIO, lgpio.LOW)
time.sleep(0.5)
# Properly close GPIO resources
lgpio.gpiochip_close(h)
# Exit with a success code
print("Gracefully shutting down the service...")
sys.exit(0)
except Exception as e:
# If there's an error during cleanup, print the error and exit with a failure code
print(f"Error during cleanup: {e}")
sys.exit(1)
# Register the signal handler for clean shutdown
signal.signal(signal.SIGINT, exit_gracefully) # Handle Ctrl+C (SIGINT)
signal.signal(signal.SIGTERM, exit_gracefully) # Handle termination signal (SIGTERM)
# Show startup animation and current volume info
for i in range(11):
tm.write(swap(tm.encode_string((" -".ljust(16))[i:i+6])))
time.sleep(0.3)
display_volume_info()
loop = asyncio.get_event_loop()
loop.create_task(auto_poweroff())
loop.create_task(display_manager_loop())
for device in remote, knob:
asyncio.ensure_future(remote_events(device))
loop.run_forever()
#EOF