• Welcome to ASR. There are many reviews of audio hardware and expert members to help answer your questions. Click here to have your audio equipment measured for free!

Display, remote and trigger power for camilladsp streamer and preamp, ( alternative to mdsimon2’s implementation )

Python:
# ====================== REMOTE DETECTION ======================

print("Searching for the remote...")

while True:
    for path in evdev.list_devices():
        device = evdev.InputDevice(path)
        if device.name == REMOTE_NAME:
            print(f"'{REMOTE_NAME}' found at {path}.")  # Exemple : /dev/input/event2
            remote_device = device
            break
    else:
        print(f"'{REMOTE_NAME}' not found. Retrying...")
        time.sleep(2)
        continue
    break


remote = evdev.InputDevice(path)
knob = evdev.InputDevice("/dev/input/event0")  # Assign rotary knob (assumed fixed at event0)
 
variable speed for volume and others mod, also use esp32-c3 microcontroller for shutdown .

Python:
import asyncio
import glob
import os
import json
import signal
import socket
import sys
import time
import math
import evdev
import lgpio
from tm1637_lgpio import TM1637
from camilladsp import CamillaClient


# ========================= CONSTANTS =========================


REMOTE_NAME = "HID Remote01 Keyboard"   # Remote name found using `python3 -m evdev.evtest`
POWER_OFF_DELAY = 2                     # Delay (in minutes) before trigger power-off when no sound is detected
HALT_DELAY = 24                         # Delay (in hours) before HALT raspberry when no sound is detected
POWER_GPIO = 4                          # GPIO pin controlling the power relay
CLK, DIO = 23, 24                       # Clock and Data pins for the TM1637 display

# Mapping of remote control keys to their functions
KEY_BINDINGS = {
    'VOLUMEDOWN': 'KEY_VOLUMEDOWN',     # Decrease volume
    'VOLUMEUP': 'KEY_VOLUMEUP',         # Increase volume
    'MUTE': 'KEY_MUTE',                 # Mute/unmute audio
    'PLAYPAUSE': 'KEY_PLAYPAUSE',       # Play/pause music (LMS command)
    'PREVIOUSSONG': 'KEY_PREVIOUSSONG', # Play previous song (LMS command)
    'NEXTSONG': 'KEY_NEXTSONG',         # Play next song (LMS command)
    'UP': 'KEY_UP',                     # Increase treble
    'DOWN': 'KEY_DOWN',                 # Decrease treble
    'LEFT': 'KEY_LEFT',                 # Decrease bass
    'RIGHT': 'KEY_RIGHT',               # Increase bass
    'POWER': 'KEY_POWER',               # Display brightness,toggle Auto power, Shutdown
    'ENTER': 'KEY_ENTER',               # Toggle tone gain settings / Toogle display relative volume on long press
    'BACK': 'KEY_BACK',                 # Switch to the next DSP configuration (prefixed with "_")
    'HOMEPAGE': 'KEY_HOMEPAGE',         # Switch to the next DSP configuration (prefixed with "|")
}

CONFIG_DIR = os.path.expanduser("~") + "/camilladsp/configs/"  # Path to DSP configuration files


# ====================== GLOBAL VARIABLES ======================


key_hold_counter      = 0
auto_power_enabled    = True
is_waiting_for_sound  = False
not_volume_displayed  = False
is_applying_config    = False
pending_config_update = False
is_key_held           = False
is_volume_key_held    = False
display_refresh_event = asyncio.Event()


# ====================== CONFIGURATION ======================


# Remote detection

print("Searching for the remote...")

TIMEOUT_SECONDS = 86400  # 24h
counter = 0

while True:
    for path in evdev.list_devices():
        device = evdev.InputDevice(path)
        if device.name == REMOTE_NAME:
            print(f"'{REMOTE_NAME}' found at {path}.")
            remote = device
            remote.grab()
            break
    else:
        counter += 1
        if counter >= TIMEOUT_SECONDS:
            print(f"'{REMOTE_NAME}' not found after {TIMEOUT_SECONDS} secondes. Shutting down...")
            os.system("sudo shutdown -h now")
            break
        print(f"'{REMOTE_NAME}' not found. Retrying...")
        time.sleep(1)
        continue
    break


# Assign rotary knob (assumed at event0)
knob = evdev.InputDevice("/dev/input/event0")

# Connect to CamillaDSP
cdsp = CamillaClient("127.0.0.1", 1234)
cdsp.connect()
config_active = cdsp.config.active()

# GPIO setup
h = lgpio.gpiochip_open(0)  # Open the GPIO chip
lgpio.gpio_claim_output(h, POWER_GPIO)  # Configure GPIO as output
lgpio.gpio_write(h, POWER_GPIO, 1)  # Set initial state to HIGH

# TM1637 Display Configuration
tm = TM1637(clk=CLK, dio=DIO)
tm.write(tm.encode_string(" " *6))
tm.brightness(1)


# ====================== FONCTIONS   =============================


def swap(segs):
    length = len(segs)
    if length == 4 or length == 5:
        segs.extend(bytearray([0] * (6 - length)))
    segs[0], segs[2] = segs[2], segs[0]
    if length >= 4:
        segs[3], segs[5] = segs[5], segs[3]
    return segs


def change_volume_from_key(cdsp, key, current_volume, is_muted=None, volume_step=1):
    volume_change = -volume_step if key == KEY_BINDINGS['VOLUMEDOWN'] else volume_step
    new_volume = max(-100, min(0, current_volume + volume_change))

    if new_volume != current_volume:
        cdsp.volume.set_main_volume(new_volume)
        display_volume_info(new_volume)


def get_bass_treble(config, mode="gain"):
    try:
        filters = config.get('filters', {})
        bass = filters['Bass']['parameters']
        treble = filters['Treble']['parameters']

        if mode == "gain":
            return bass['gain'], treble['gain']
        elif mode == "parameters":
            return bass, treble
        else:
            raise ValueError("Mode invalide : utiliser 'gain' ou 'parameters'")
    except (KeyError, TypeError):
        return None, None


def display_volume_info(current_volume=None, is_muted=None):
    global not_volume_displayed, blank_volume_when_mute

    if current_volume is None:
        current_volume = cdsp.volume.main_volume()

    if is_muted is None:
        is_muted = cdsp.volume.main_mute()

    config_path = cdsp.config.file_path().replace(CONFIG_DIR, '')[1:4]
    vol = round(current_volume)
    display_vol = max(-99, vol)

    width = len(str(abs(display_vol)))

    if display_vol == 0:
        blanks = '  -'
    else:
        blanks = (' --', '---')[min(width, 2) - 1]

    if is_muted and not is_volume_key_held:
        blank_volume_when_mute = True
        volume_str = f"{config_path}{blanks}"
    else:
        blank_volume_when_mute = False
        volume_str = f"{config_path}{display_vol:3}"

    tm.write(swap(tm.encode_string(volume_str)))
    not_volume_displayed = False


def display_tone_info():
    global not_volume_displayed
    bass_gain, treble_gain = get_bass_treble(config_active, mode="gain")

    if bass_gain is not None and treble_gain is not None:
        bass_gain = round(bass_gain)
        treble_gain = round(treble_gain)
        tm.write(swap(tm.encode_string(f"{bass_gain:2}B{treble_gain:2}T")))
        not_volume_displayed = True


def send_lms_command(command):
    """Retrieve the MAC address of the default network interface, format it,
    and send a command to LMS using the MAC address as an identifier."""
    try:
        # Get the default network interface
        with os.popen("ip route show default") as route_info:
            # Extract the interface name from the routing table entry
            default_iface = next((line.split()[4] for line in route_info if "default" in line), None)

        # Raise an exception if no default network interface is found
        if not default_iface:
            raise ValueError("No default network interface found")

        # Read the MAC address of the interface from the system file
        mac_path = f"/sys/class/net/{default_iface}/address"
        with open(mac_path) as f:
            mac = f.read().strip()

        # Format the MAC address to be URL-encoded (%3A instead of ':')
        mac = mac.replace(":", "%3A").lower()

        # Get the hostname of the device
        host = socket.gethostname()
        port = 9090

        # Create a socket connection to LMS and send the command
        with socket.create_connection((host, port)) as sock:
            sock.sendall(f"{mac} {command}\r\nexit\r\n".encode("utf-8"))
            response = sock.recv(4096).decode("utf-8")  # Read the response
            return response.strip()

    except Exception as e:
        # Print error if any exception occurs
        print(f"Error: {e}")
        return None


def get_repeat_speed(volume, direction, exponent=2.0, pivot=-50):

    volume = max(-100, min(0, volume))

    min_delay = 0.01
    max_delay = 0.5

    if direction == "up":

        if volume >= pivot:
            t = (volume - pivot) / (0 - pivot)
        else:
            t = 0
        factor = math.pow(t, exponent)
        return min_delay + factor * (max_delay - min_delay)

    elif direction == "down":
        if volume <= pivot:
            t = (pivot - volume) / (pivot - (-100))
        else:
            t = 0
        factor = math.pow(t, exponent)
        return min_delay + factor * (max_delay - min_delay)


async def change_config(cdsp, config_pattern):
    global config_active

    bass_gain, treble_gain = get_bass_treble(config_active, mode="gain")

    config_files = glob.glob(config_pattern)
    current_config = cdsp.config.file_path()

    if not config_files:
        return

    try:
        index = config_files.index(current_config)
        new_config_path = config_files[(index + 1) % len(config_files)]
    except ValueError:
        new_config_path = config_files[0]

    if new_config_path == current_config:
        return

    new_config_data = cdsp.config.read_and_parse_file(new_config_path)
    filters = new_config_data.get('filters', {})
    if isinstance(filters, dict):

        if bass_gain is not None:
            try:
                filters['Bass']['parameters']['gain'] = bass_gain
            except KeyError:
                pass

        if treble_gain is not None:
            try:
                filters['Treble']['parameters']['gain'] = treble_gain
            except KeyError:
                pass

        # Normalize relative paths
        config_dir = os.path.dirname(os.path.abspath(new_config_path))
        for name, filt in filters.items():
            params = filt.get("parameters", {})
            filename = params.get("filename")
            if filename and isinstance(filename, str) and not os.path.isabs(filename):
                abs_path = os.path.normpath(os.path.join(config_dir, filename))
                filters[name]["parameters"]["filename"] = abs_path

    cdsp.config.set_active(new_config_data)
    cdsp.config.set_file_path(new_config_path)
    config_active = new_config_data
    display_volume_info()


async def handle_gain_change_async(key, cdsp, tm):
    global is_applying_config, pending_config_update, not_volume_displayed, config_active

    key_map = {
        KEY_BINDINGS['UP']:    ("Treble", 1),
        KEY_BINDINGS['DOWN']:  ("Treble", -1),
        KEY_BINDINGS['RIGHT']: ("Bass", 1),
        KEY_BINDINGS['LEFT']:  ("Bass", -1),
    }

    if not (param_delta := key_map.get(key)) or (param := param_delta[0]) not in (filters := config_active.get("filters", {})): return
    delta = param_delta[1]

    if not not_volume_displayed:
        display_tone_info()
        not_volume_displayed = True
        return

    gain = filters[param]["parameters"].get("gain", 0)
    new_gain = max(-6, min(6, gain + delta))
    if new_gain == gain:
        return

    filters[param]["parameters"]["gain"] = new_gain

    # Display
    bass_gain = round(filters['Bass']['parameters'].get('gain', 0))
    treble_gain = round(filters['Treble']['parameters'].get('gain', 0))
    gain_format = f"{bass_gain:2}B{treble_gain:2}T"
    encoded = tm.encode_string(gain_format)
    swapped = swap(encoded)
    tm.write(swapped)
    not_volume_displayed = False

    # Trigger update if needed
    if not is_applying_config:
        asyncio.create_task(apply_config_async(cdsp))
    else:
        pending_config_update = True


async def apply_config_async(cdsp):
    global is_applying_config, pending_config_update, not_volume_displayed
    is_applying_config = True

    while True:
        pending_config_update = False
        cdsp.config.set_active(config_active)
        not_volume_displayed = True
        await asyncio.sleep(0.05)  # minimal delay to accumulate keypresses

        if not pending_config_update:
            break  # no new modification → stop

    is_applying_config = False


async def display_manager_loop():
    global not_volume_displayed, key_hold_counter, is_key_held, is_volume_key_held
    idleDisplayCounter = 0

    while True:

        if is_volume_key_held:
            is_volume_key_held = False
            await asyncio.sleep(0.9)
            display_volume_info()

        if not_volume_displayed:
            key_hold_counter += 1
            if key_hold_counter == 5:
                key_hold_counter = 0
                display_volume_info()

        if is_key_held:
            idleDisplayCounter = 0
            is_key_held = False

        if "-1000.0" in str(cdsp.levels.capture_rms()):
            idleDisplayCounter +=1
            if idleDisplayCounter == ( POWER_OFF_DELAY*60 ) + 5 :
                idleDisplayCounter = 0
                tm.write(tm.encode_string(" " *6))  # Clear the display

        try:
            await asyncio.wait_for(display_refresh_event.wait(), timeout=1.0)
            display_refresh_event.clear()
        except asyncio.TimeoutError:
            pass


async def toggle_power():
    global auto_power_enabled, is_waiting_for_sound

    if not auto_power_enabled:
        # If the relay is off, activate it and re-enable auto shutdown
        auto_power_enabled = True
        lgpio.gpio_write(h, POWER_GPIO, lgpio.HIGH)  # Activate the relay
        tm.write(swap(tm.encode_string("PW ON")))  # Display "POWER ON"
        is_waiting_for_sound = False  # No need to wait for sound anymore

    else:
        # If auto_power_enabled, check if the relay was turned off by auto shutdown
        if is_waiting_for_sound:
            # If waiting for sound to turn back on, force relay activation
            lgpio.gpio_write(h, POWER_GPIO, lgpio.HIGH)  # Activate the relay
            tm.write(swap(tm.encode_string("PW ON")))  # Display "POWER ON"
            is_waiting_for_sound = False  # Reset sound waiting state

        else:
            # Otherwise, deactivate the relay normally
            auto_power_enabled = False
            lgpio.gpio_write(h, POWER_GPIO, lgpio.LOW)  # Deactivate the relay
            tm.write(swap(tm.encode_string("PW OFF")))  # Display "POWER OFF"

    await asyncio.sleep(3)
    display_volume_info()  # if auto_power_enabled else tm.write(tm.encode_string("      "))


async def auto_poweroff():
    global auto_power_enabled, is_waiting_for_sound
    silenceCounter = 0
    last_power_relay_state = lgpio.gpio_read(h, POWER_GPIO)

    while True:
        await asyncio.sleep(1)

        if "-1000.0" in str(cdsp.levels.capture_rms()):  # No sound detected

            silenceCounter += 1
            if silenceCounter == POWER_OFF_DELAY*60:
                if not is_waiting_for_sound:
                    # Auto power-off: deactivate the relay after the delay without sound
                    lgpio.gpio_write(h, POWER_GPIO, lgpio.LOW)
                    tm.write(swap(tm.encode_string("PW OFF")))
                    last_power_relay_state = lgpio.gpio_read(h, POWER_GPIO)  # Read the actual state
                    is_waiting_for_sound = True  # Wait for sound or Power button press to reactivate

            if silenceCounter >= int(HALT_DELAY * 3600):
                tm.write(swap(tm.encode_string(" HALT ")))
                os.system("sudo shutdown -h now")
                break

        else:  # Sound detected
            silenceCounter = 0
            if auto_power_enabled and last_power_relay_state == 0:
                # If auto_power_enabled and sound is detected, reactivate the relay
                if is_waiting_for_sound:
                    lgpio.gpio_write(h, POWER_GPIO, lgpio.HIGH)
                    display_volume_info()
                    last_power_relay_state = lgpio.gpio_read(h, POWER_GPIO)
                    is_waiting_for_sound = False  # Reset the wait-for-sound flag
            elif not auto_power_enabled and last_power_relay_state == 1:
                # Always turn off the relay if not auto_power_enabled
                lgpio.gpio_write(h, POWER_GPIO, lgpio.LOW)
                last_power_relay_state = lgpio.gpio_read(h, POWER_GPIO)


async def remote_events(device):
    global not_volume_displayed, key_hold_counter, is_key_held, is_volume_key_held

    bass_gain_prev = treble_gain_prev = repeat_throttle = br_direction = last_repeat_time = 0

    while True:
        try:
            # Process events asynchronously from the device
            async for event in device.async_read_loop():
                if event.type == evdev.ecodes.EV_KEY:
                    attrib = evdev.categorize(event)
                    key = attrib.keycode

                    if attrib.keystate == 1:  # Key pressed
                        is_key_held = True

                        if key in (KEY_BINDINGS['VOLUMEDOWN'], KEY_BINDINGS['VOLUMEUP']):
                            is_volume_key_held = True
                            current_volume = cdsp.volume.main_volume()
                            is_muted = cdsp.volume.main_mute()
                            if not_volume_displayed or blank_volume_when_mute:
                                display_volume_info(current_volume, is_muted)
                            else:
                                change_volume_from_key(cdsp, key, current_volume, is_muted)

                        elif key == KEY_BINDINGS['PREVIOUSSONG']:
                            # Command to play the previous song
                            send_lms_command("playlist index -1")

                        elif key == KEY_BINDINGS['NEXTSONG']:
                            # Command to play the next song
                            send_lms_command("playlist index +1")

                        elif key in (
                            KEY_BINDINGS['UP'],
                            KEY_BINDINGS['DOWN'],
                            KEY_BINDINGS['RIGHT'],
                            KEY_BINDINGS['LEFT']
                        ):
                            asyncio.create_task(handle_gain_change_async(key, cdsp, tm))


                        elif key == KEY_BINDINGS['BACK']:
                            await change_config(cdsp, CONFIG_DIR + '_*')

                        elif key == KEY_BINDINGS['HOMEPAGE']:
                            await change_config(cdsp, CONFIG_DIR + '|*')


                        elif key == KEY_BINDINGS['ENTER']:
                            if not_volume_displayed:
                                bass_params, treble_params = get_bass_treble(config_active, mode="parameters")
                                if bass_params is not None and treble_params is not None:
                                    if bass_params['gain'] == treble_params['gain'] == 0:
                                        bass_params['gain'], treble_params['gain'] = bass_gain_prev, treble_gain_prev
                                        bass_gain_now, treble_gain_now = bass_gain_prev, treble_gain_prev
                                        bass_gain_prev, treble_gain_prev = 0, 0
                                    else:
                                        bass_gain_prev, treble_gain_prev = bass_params['gain'], treble_params['gain']
                                        bass_params['gain'] = treble_params['gain'] = 0
                                        bass_gain_now = treble_gain_now = 0

                                    cdsp.config.set_active(config_active)
                                    display_tone_info()
                            else:
                                display_tone_info()



                    if attrib.keystate == 2:  # Key held down

                        if key in (KEY_BINDINGS['VOLUMEDOWN'], KEY_BINDINGS['VOLUMEUP']):
                            is_volume_key_held = True
                            current_time = time.time()
                            current_volume = cdsp.volume.main_volume()
                            is_muted = cdsp.volume.main_mute()

                            if key == KEY_BINDINGS['VOLUMEUP']:
                                direction = "up"
                                exponent = 2.0
                                pivot=-60
                            else:
                                direction = "down"
                                exponent = 2.0
                                pivot=-40
                            repeat_speed = get_repeat_speed(current_volume, direction, exponent=exponent, pivot=pivot)

                            if current_time - last_repeat_time >= repeat_speed:
                                step = 1
                                change_volume_from_key(cdsp, key, current_volume, is_muted, volume_step=step)
                                last_repeat_time = current_time

                        elif key == KEY_BINDINGS['PLAYPAUSE']:
                            # Stop LMS
                            key_hold_counter += 1
                            if key_hold_counter == 10:
                                send_lms_command("stop")

                        elif key == KEY_BINDINGS['POWER']:
                            key_hold_counter += 1
                            if key_hold_counter == 10:
                                await toggle_power()
                            elif key_hold_counter == 40:
                                tm.write(swap(tm.encode_string(" HALT ")))
                                os.system("sudo shutdown -h now")

                        elif key in (
                            KEY_BINDINGS['UP'],
                            KEY_BINDINGS['DOWN'],
                            KEY_BINDINGS['RIGHT'],
                            KEY_BINDINGS['LEFT']
                        ):
                            repeat_throttle += 1
                            if repeat_throttle >= 4:
                                asyncio.create_task(handle_gain_change_async(key, cdsp, tm))
                                repeat_throttle = 0

                    if attrib.keystate == 0:  # Key released

                        if KEY_BINDINGS['MUTE'] in key:
                            # Toggle mute status
                            if key_hold_counter < 20:
                                is_muted = not cdsp.volume.main_mute()
                                cdsp.volume.set_main_mute(is_muted)
                                current_volume = cdsp.volume.main_volume()
                                display_volume_info(current_volume, is_muted)

                        elif key == KEY_BINDINGS['PLAYPAUSE'] and key_hold_counter < 10:
                            send_lms_command("pause")

                        elif key == KEY_BINDINGS['POWER'] and key_hold_counter < 10:
                            # Adjust screen brightness on POWER key release
                            if br_direction == 1:
                                if tm.brightness() < 2:
                                    tm.brightness(tm.brightness() + 1)
                                else:
                                    tm.brightness(tm.brightness() - 1)
                                    br_direction = -1
                            else:
                                if tm.brightness() > 0:
                                    tm.brightness(tm.brightness() - 1)
                                else:
                                    tm.brightness(tm.brightness() + 1)
                                    br_direction = 1

                        elif key in (KEY_BINDINGS['VOLUMEDOWN'], KEY_BINDINGS['VOLUMEUP']):
                            display_refresh_event.set()

                        key_hold_counter = 0
                        repeat_throttle = 0
                        last_repeat_time = 0


        except OSError as e:
            print(f"❌ Critical error: {e}. Restarting the script...")
            python = sys.executable
            os.execv(python, [python] + sys.argv)  # Replaces the current process


def exit_gracefully(signal, frame):
    """Gracefully shuts down the service, including cleanup of GPIO resources."""
    try:
        lgpio.gpio_write(h, POWER_GPIO, lgpio.LOW)
        time.sleep(0.5)
        # Properly close GPIO resources
        lgpio.gpiochip_close(h)

        # Exit with a success code
        print("Gracefully shutting down the service...")
        sys.exit(0)
    except Exception as e:
        # If there's an error during cleanup, print the error and exit with a failure code
        print(f"Error during cleanup: {e}")
        sys.exit(1)

# Register the signal handler for clean shutdown
signal.signal(signal.SIGINT, exit_gracefully)  # Handle Ctrl+C (SIGINT)
signal.signal(signal.SIGTERM, exit_gracefully)  # Handle termination signal (SIGTERM)


# Show startup animation and current volume info

for i in range(11):
    tm.write(swap(tm.encode_string(("      -".ljust(16))[i:i+6])))
    time.sleep(0.3)
display_volume_info()


loop = asyncio.get_event_loop()
loop.create_task(auto_poweroff())
loop.create_task(display_manager_loop())

for device in remote, knob:
    asyncio.ensure_future(remote_events(device))

loop.run_forever()

#EOF
 
Last edited:
ESP32-C3 remote power controller for Raspberry Pi – Power your Pi on or off using the Bluetooth remote. Includes automatic power status detection and smart Bluetooth reconnection.

Requires adding to /boot/firmware/config.txt :
dtoverlay=gpio-poweroff,gpiopin=21,active_low=1
dtoverlay=gpio-shutdown,gpio_pin=20,active_low=0,gpio_pull=down
 

Attachments

  • Like
Reactions: MCH
Back
Top Bottom