from flask import Flask, render_template, request, jsonify import asyncio from bleak import BleakScanner, BleakClient import threading import time import json import sys import signal import os # ================================================================================================= # APP CONFIGURATION # ================================================================================================= # Set to True to run without a physical BLE device for testing purposes. # Set to False to connect to the actual lamp matrix. DEBUG_MODE = True # --- BLE Device Configuration (Ignored in DEBUG_MODE) --- DEVICE_NAME = "Pupilometer LED Billboard" global ble_client global ble_characteristics ble_client = None ble_characteristics = None ble_event_loop = None # Will be initialized if not in debug mode # ================================================================================================= # BLE HELPER FUNCTIONS (Used in LIVE mode) # ================================================================================================= lampAmount = 25 def create_spiral_map(n=5): if n % 2 == 0: raise ValueError("Matrix size must be odd for a unique center point.") spiral_map = [[0] * n for _ in range(n)] r, c = n // 2, n // 2 address = 0 spiral_map[r][c] = address # Updated directions to start moving UP first instead of right dr = [-1, 0, 1, 0] # Change in row: Up, Right, Down, Left dc = [0, 1, 0, -1] # Change in col: Up, Right, Down, Left direction = 0 segment_length = 1 steps = 0 while address < n * n - 1: for _ in range(segment_length): address += 1 r += dr[direction] c += dc[direction] if 0 <= r < n and 0 <= c < n: spiral_map[r][c] = address direction = (direction + 1) % 4 steps += 1 if steps % 2 == 0: segment_length += 1 return spiral_map def get_spiral_address(row, col, spiral_map): n = len(spiral_map) if 0 <= row < n and 0 <= col < n: return spiral_map[row][col] else: return -1 SPIRAL_MAP_5x5 = create_spiral_map(5) async def set_full_matrix_on_ble(colorSeries): global ble_client global ble_characteristics if not ble_client or not ble_client.is_connected: print("BLE client not connected. Attempting to reconnect...") await connect_to_ble_device() if not ble_client or not ble_client.is_connected: print("Failed to reconnect to BLE client.") return else: print("Confirmed BLE connection status. Proceeding with lamp update.") # ===================================================================== # SNIPPET TO PATCH SWAPPED LAMP POSITIONS # ===================================================================== print("Patching lamp positions 3 <-> 7 and 12 <-> 24.") # Swap data for lamps at positions 3 and 7 temp_color_3 = colorSeries[3] colorSeries[3] = colorSeries[7] colorSeries[7] = temp_color_3 # Swap data for lamps at positions 12 and 24 temp_color_12 = colorSeries[12] colorSeries[12] = colorSeries[24] colorSeries[24] = temp_color_12 # ===================================================================== # Ensure all characteristics are available before writing if len(ble_characteristics) != lampAmount: print(f"Mismatch in lamp amount. Expected {lampAmount}, got {len(ble_characteristics)}.") return print(f"Constructed the following matrix data: {colorSeries}") # Write each byte string to its corresponding characteristic for i, char in enumerate(ble_characteristics): value_to_write = colorSeries[i] print(f"Setting Lamp {i} ({char.uuid}) to {value_to_write.hex()}") await ble_client.write_gatt_char(char.uuid, value_to_write) async def connect_to_ble_device(): global ble_client global ble_characteristics print(f"Scanning for device: {DEVICE_NAME}...") devices = await BleakScanner.discover() target_device = next((d for d in devices if d.name == DEVICE_NAME), None) if not target_device: print(f"Device '{DEVICE_NAME}' not found.") return False print(f"Found device: {target_device.name} ({target_device.address})") try: ble_client = BleakClient(target_device.address) await ble_client.connect() if ble_client.is_connected: print(f"Connected to {target_device.name}") services = [service for service in ble_client.services if service.handle != 1] # The previous logic for filtering services seems incorrect; let's grab all characteristics characteristics = [ char for service in services for char in service.characteristics ] ble_characteristics = sorted(characteristics, key=lambda char: char.handle) print(f"Found {len(ble_characteristics)} characteristics for lamps.") return True else: print(f"Failed to connect to {target_device.name}") return False except Exception as e: print(f"An error occurred during BLE connection: {e}") return False # ================================================================================================= # COLOR MIXING # ================================================================================================= def calculate_rgb(ww, cw, blue): """ Calculates the combined RGB color from warm white, cool white, and blue light values. This function is a Python equivalent of the JavaScript color mixer in index.html. """ # Define the RGB components for each light source based on slider track colors warm_white_r, warm_white_g, warm_white_b = 255, 192, 128 cool_white_r, cool_white_g, cool_white_b = 192, 224, 255 blue_r, blue_g, blue_b = 0, 0, 255 # Normalize the slider values (0-255) and apply them to the base colors r = (ww / 255) * warm_white_r + (cw / 255) * cool_white_r + (blue / 255) * blue_r g = (ww / 255) * warm_white_g + (cw / 255) * cool_white_g + (blue / 255) * blue_g b = (ww / 255) * warm_white_b + (cw / 255) * cool_white_b + (blue / 255) * blue_b # Clamp the values to 255 and convert to integer r = int(min(255, round(r))) g = int(min(255, round(g))) b = int(min(255, round(b))) return r, g, b def rgb_to_hex(r, g, b): """ Converts RGB color values to a hex color string. """ # Ensure values are within the valid range (0-255) and are integers r = int(max(0, min(255, r))) g = int(max(0, min(255, g))) b = int(max(0, min(255, b))) # Convert each component to a two-digit hexadecimal string return f'#{r:02x}{g:02x}{b:02x}' # ================================================================================================= # FLASK APPLICATION # ================================================================================================= app = Flask(__name__) # In-memory matrix for DEBUG_MODE lamp_matrix = [['#000000' for _ in range(5)] for _ in range(5)] @app.route('/') def index(): print(f"Getting current lamp matrix info: {lamp_matrix}") if DEBUG_MODE: return render_template('index.html', matrix=lamp_matrix) else: # In live mode, we'll pass a default black matrix. initial_matrix = [['#000000' for _ in range(5)] for _ in range(5)] return render_template('index.html', matrix=initial_matrix) @app.route('/set_matrix', methods=['POST']) def set_matrix(): data = request.get_json() full_matrix = data.get('matrix', []) matrixDataSerialized = [] if not full_matrix or len(full_matrix) != 5 or len(full_matrix[0]) != 5: return jsonify(success=False, message="Invalid matrix data received"), 400 else: print(f"Received the following matrix data: {full_matrix}") #Creating empty byte array serial_colors = [b'\x00\x00\x00'] * lampAmount try: for row in range(5): for col in range(5): lamp_data = full_matrix[row][col] ww = int(lamp_data['ww']) cw = int(lamp_data['cw']) blue = int(lamp_data['blue']) #Preparing byte data for control command color_bytes = bytes([ww, cw, blue]) spiral_pos = get_spiral_address(row, col, SPIRAL_MAP_5x5) print(f"Constructed data for {spiral_pos}: {color_bytes}") if spiral_pos != -1: serial_colors[spiral_pos] = color_bytes #Preparing hex color data for frontend lampColorR, lampColorG, lampColorB = calculate_rgb(ww,cw,blue) lamp_matrix[row][col] = rgb_to_hex(lampColorR, lampColorG, lampColorB) if DEBUG_MODE: # === DEBUG MODE: Update in-memory matrix === return jsonify(success=True) else: # === LIVE MODE: Communicate with the BLE device === asyncio.run_coroutine_threadsafe( set_full_matrix_on_ble(serial_colors), ble_event_loop ) return jsonify(success=True) except Exception as e: print(f"Error in set_matrix route: {e}") return jsonify(success=False, message=str(e)), 500 print(f"Getting current lamp matrix info: {lamp_matrix}") # ================================================================================================= # APP STARTUP # ================================================================================================= def signal_handler(signum, frame): print("Received shutdown signal, gracefully shutting down...") if not DEBUG_MODE and ble_client and ble_client.is_connected: print("Disconnecting BLE client...") disconnect_future = asyncio.run_coroutine_threadsafe(ble_client.disconnect(), ble_event_loop) try: # Wait for the disconnect to complete with a timeout disconnect_future.result(timeout=5) print("BLE client disconnected successfully.") except Exception as e: print(f"Error during BLE disconnect: {e}") if not DEBUG_MODE and ble_event_loop and ble_event_loop.is_running(): print("Stopping BLE event loop...") # Schedule a stop and wait for the thread to finish ble_event_loop.call_soon_threadsafe(ble_event_loop.stop) ble_thread.join(timeout=1) print("BLE event loop stopped.") os._exit(0) if __name__ == '__main__': # Register the signal handler before running the app signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) if not DEBUG_MODE: print("Starting BLE event loop in background thread...") ble_event_loop = asyncio.new_event_loop() ble_thread = threading.Thread(target=ble_event_loop.run_forever, daemon=True) ble_thread.start() # Connect to the device as soon as the app starts future = asyncio.run_coroutine_threadsafe(connect_to_ble_device(), ble_event_loop) future.result(timeout=10) # Wait up to 10 seconds for connection app.run(debug=True, use_reloader=False, host="0.0.0.0")