diff --git a/src/controllerSoftware/app.py b/src/controllerSoftware/app.py index e6534f55..0d5dfd4d 100644 --- a/src/controllerSoftware/app.py +++ b/src/controllerSoftware/app.py @@ -1,343 +1,345 @@ -from flask import Flask, render_template, request, jsonify -import asyncio -from bleak import BleakScanner, BleakClient -import threading -import time -import json -import sys -import signal -import os -from vision import VisionSystem - -# ================================================================================================= -# APP CONFIGURATION -# ================================================================================================= - -# Set to True to run without a physical BLE device for testing purposes. -# Set to False to connect to the actual lamp matrix. -DEBUG_MODE = True - -# --- BLE Device Configuration (Ignored in DEBUG_MODE) --- -DEVICE_NAME = "Pupilometer LED Billboard" -global ble_client -global ble_characteristics -ble_client = None -ble_characteristics = None -ble_event_loop = None # Will be initialized if not in debug mode -ble_connection_status = False - -# ================================================================================================= -# BLE HELPER FUNCTIONS (Used in LIVE mode) -# ================================================================================================= - -lampAmount = 25 - -def create_spiral_map(n=5): - if n % 2 == 0: - raise ValueError("Matrix size must be odd for a unique center point.") - spiral_map = [[0] * n for _ in range(n)] - r, c = n // 2, n // 2 - address = 0 - spiral_map[r][c] = address - - # Updated directions to start moving UP first instead of right - dr = [-1, 0, 1, 0] # Change in row: Up, Right, Down, Left - dc = [0, 1, 0, -1] # Change in col: Up, Right, Down, Left - - direction = 0 - segment_length = 1 - steps = 0 - while address < n * n - 1: - for _ in range(segment_length): - address += 1 - r += dr[direction] - c += dc[direction] - if 0 <= r < n and 0 <= c < n: - spiral_map[r][c] = address - direction = (direction + 1) % 4 - steps += 1 - if steps % 2 == 0: - segment_length += 1 - return spiral_map - - -def get_spiral_address(row, col, spiral_map): - n = len(spiral_map) - if 0 <= row < n and 0 <= col < n: - return spiral_map[row][col] - else: - return -1 - -SPIRAL_MAP_5x5 = create_spiral_map(5) - -async def set_full_matrix_on_ble(colorSeries): - global ble_client - global ble_characteristics - - if not ble_client or not ble_client.is_connected: - print("BLE client not connected. Attempting to reconnect...") - await connect_to_ble_device() - if not ble_client or not ble_client.is_connected: - print("Failed to reconnect to BLE client.") - return - else: - print("Confirmed BLE connection status. Proceeding with lamp update.") - - # ===================================================================== - # SNIPPET TO PATCH SWAPPED LAMP POSITIONS - # ===================================================================== - print("Patching lamp positions 3 <-> 7 and 12 <-> 24.") - - # Swap data for lamps at positions 3 and 7 - temp_color_3 = colorSeries[3] - colorSeries[3] = colorSeries[7] - colorSeries[7] = temp_color_3 - - # Swap data for lamps at positions 12 and 24 - temp_color_12 = colorSeries[12] - colorSeries[12] = colorSeries[24] - colorSeries[24] = temp_color_12 - # ===================================================================== - - if DEBUG_MODE: - # Ensure all characteristics are available before writing - print(f"Confirmed DEBUG set to true.") - if len(ble_characteristics) != lampAmount: - print(f"Mismatch in lamp amount. Expected {lampAmount}, got {len(ble_characteristics)}.") - return - print(f"Constructed the following matrix data: {colorSeries}") - # Write each byte string to its corresponding characteristic - for i, char in enumerate(ble_characteristics): - value_to_write = colorSeries[i] - print(f"Setting Lamp {i} ({char.uuid}) to {value_to_write.hex()}") - await ble_client.write_gatt_char(char.uuid, value_to_write) - else: - print(f"Confirmed DEBUG set to false.") - value_to_write = b"".join([color for color in colorSeries]) - print(value_to_write) - print(f"Setting lamps to {value_to_write.hex()}") - await ble_client.write_gatt_char(ble_characteristics[0].uuid, value_to_write) - - -async def connect_to_ble_device(): - global ble_client - global ble_characteristics - global ble_connection_status - - print(f"Scanning for device: {DEVICE_NAME}...") - devices = await BleakScanner.discover() - target_device = next((d for d in devices if d.name == DEVICE_NAME), None) - - if not target_device: - print(f"Device '{DEVICE_NAME}' not found.") - ble_connection_status = False - return False - - print(f"Found device: {target_device.name} ({target_device.address})") - - try: - ble_client = BleakClient(target_device.address) - await ble_client.connect() - if ble_client.is_connected: - print(f"Connected to {target_device.name}") - services = [service for service in ble_client.services if service.handle != 1] - - # The previous logic for filtering services seems incorrect; let's grab all characteristics - characteristics = [ - char for service in services for char in service.characteristics - ] - ble_characteristics = sorted(characteristics, key=lambda char: char.handle) - print(f"Found {len(ble_characteristics)} characteristics for lamps.") - ble_connection_status = True - return True - else: - print(f"Failed to connect to {target_device.name}") - ble_connection_status = False - return False - except Exception as e: - print(f"An error occurred during BLE connection: {e}") - ble_connection_status = False - return False -# ================================================================================================= -# COLOR MIXING -# ================================================================================================= - -def calculate_rgb(ww, cw, blue): - """ - Calculates the combined RGB color from warm white, cool white, and blue light values. - This function is a Python equivalent of the JavaScript color mixer in index.html. - """ - # Define the RGB components for each light source based on slider track colors - warm_white_r, warm_white_g, warm_white_b = 255, 192, 128 - cool_white_r, cool_white_g, cool_white_b = 192, 224, 255 - blue_r, blue_g, blue_b = 0, 0, 255 - - # Normalize the slider values (0-255) and apply them to the base colors - r = (ww / 255) * warm_white_r + (cw / 255) * cool_white_r + (blue / 255) * blue_r - g = (ww / 255) * warm_white_g + (cw / 255) * cool_white_g + (blue / 255) * blue_g - b = (ww / 255) * warm_white_b + (cw / 255) * cool_white_b + (blue / 255) * blue_b - - # Clamp the values to 255 and convert to integer - r = int(min(255, round(r))) - g = int(min(255, round(g))) - b = int(min(255, round(b))) - - return r, g, b - -def rgb_to_hex(r, g, b): - """ - Converts RGB color values to a hex color string. - """ - # Ensure values are within the valid range (0-255) and are integers - r = int(max(0, min(255, r))) - g = int(max(0, min(255, g))) - b = int(max(0, min(255, b))) - - # Convert each component to a two-digit hexadecimal string - return f'#{r:02x}{g:02x}{b:02x}' - -# ================================================================================================= -# FLASK APPLICATION -# ================================================================================================= - -app = Flask(__name__) - -# In-memory matrix for DEBUG_MODE -lamp_matrix = [['#000000' for _ in range(5)] for _ in range(5)] - -@app.route('/') -def index(): - print(f"Getting current lamp matrix info: {lamp_matrix}") - if DEBUG_MODE: - return render_template('index.html', matrix=lamp_matrix) - else: - # In live mode, we'll pass a default black matrix. - initial_matrix = [['#000000' for _ in range(5)] for _ in range(5)] - return render_template('index.html', matrix=initial_matrix) - - -@app.route('/set_matrix', methods=['POST']) -def set_matrix(): - data = request.get_json() - full_matrix = data.get('matrix', []) - matrixDataSerialized = [] - if not full_matrix or len(full_matrix) != 5 or len(full_matrix[0]) != 5: - return jsonify(success=False, message="Invalid matrix data received"), 400 - else: - print(f"Received the following matrix data: {full_matrix}") - - #Creating empty byte array - serial_colors = [b'\x00\x00\x00'] * lampAmount - - try: - for row in range(5): - for col in range(5): - lamp_data = full_matrix[row][col] - ww = int(lamp_data['ww']) - cw = int(lamp_data['cw']) - blue = int(lamp_data['blue']) - - #Preparing byte data for control command - color_bytes = bytes([ww, cw, blue]) - spiral_pos = get_spiral_address(row, col, SPIRAL_MAP_5x5) - print(f"Constructed data for {spiral_pos}: {color_bytes}") - if spiral_pos != -1: - serial_colors[spiral_pos] = color_bytes - #Preparing hex color data for frontend - lampColorR, lampColorG, lampColorB = calculate_rgb(ww,cw,blue) - lamp_matrix[row][col] = rgb_to_hex(lampColorR, lampColorG, lampColorB) - if DEBUG_MODE: - # === DEBUG MODE: Update in-memory matrix === - return jsonify(success=True) - else: - # === LIVE MODE: Communicate with the BLE device === - asyncio.run_coroutine_threadsafe( - set_full_matrix_on_ble(serial_colors), - ble_event_loop - ) - return jsonify(success=True) - except Exception as e: - print(f"Error in set_matrix route: {e}") - return jsonify(success=False, message=str(e)), 500 - - print(f"Getting current lamp matrix info: {lamp_matrix}") - -@app.route('/ble_status') -def ble_status(): - global ble_connection_status - if DEBUG_MODE: - return jsonify(connected=True) - return jsonify(connected=ble_connection_status) - -@app.route('/vision/pupil_data') -def get_pupil_data(): - """ - Endpoint to get the latest pupil segmentation data from the vision system. - """ - if vision_system: - data = vision_system.get_pupil_data() - return jsonify(success=True, data=data) - return jsonify(success=False, message="Vision system not initialized"), 500 - -# ================================================================================================= -# APP STARTUP -# ================================================================================================= - -vision_system = None - -def signal_handler(signum, frame): - print("Received shutdown signal, gracefully shutting down...") - global ble_connection_status - - # Stop the vision system - if vision_system: - print("Stopping vision system...") - vision_system.stop() - print("Vision system stopped.") - - if not DEBUG_MODE and ble_client and ble_client.is_connected: - print("Disconnecting BLE client...") - ble_connection_status = False - disconnect_future = asyncio.run_coroutine_threadsafe(ble_client.disconnect(), ble_event_loop) - try: - # Wait for the disconnect to complete with a timeout - disconnect_future.result(timeout=5) - print("BLE client disconnected successfully.") - except Exception as e: - print(f"Error during BLE disconnect: {e}") - - if not DEBUG_MODE and ble_event_loop and ble_event_loop.is_running(): - print("Stopping BLE event loop...") - # Schedule a stop and wait for the thread to finish - ble_event_loop.call_soon_threadsafe(ble_event_loop.stop) - ble_thread.join(timeout=1) - print("BLE event loop stopped.") - - os._exit(0) - -if __name__ == '__main__': - # Register the signal handler before running the app - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - # Initialize and start the Vision System - try: - vision_config = {"camera_id": 0, "model_path": "yolov10.onnx"} - vision_system = VisionSystem(config=vision_config) - vision_system.start() - except Exception as e: - print(f"Failed to initialize or start Vision System: {e}") - vision_system = None - - - if not DEBUG_MODE: - print("Starting BLE event loop in background thread...") - ble_event_loop = asyncio.new_event_loop() - ble_thread = threading.Thread(target=ble_event_loop.run_forever, daemon=True) - ble_thread.start() - - # Connect to the device as soon as the app starts - future = asyncio.run_coroutine_threadsafe(connect_to_ble_device(), ble_event_loop) - future.result(timeout=10) # Wait up to 10 seconds for connection - - app.run(debug=True, use_reloader=False, host="0.0.0.0") +from flask import Flask, render_template, request, jsonify +import asyncio +from bleak import BleakScanner, BleakClient +import threading +import time +import json +import sys +import signal +import os +from vision import VisionSystem + +# ================================================================================================= +# APP CONFIGURATION +# ================================================================================================= + +# Set to True to run without a physical BLE device for testing purposes. +# Set to False to connect to the actual lamp matrix. +DEBUG_MODE = True + +# --- BLE Device Configuration (Ignored in DEBUG_MODE) --- +DEVICE_NAME = "Pupilometer LED Billboard" +global ble_client +global ble_characteristics +global ble_connection_status +ble_client = None +ble_characteristics = None +ble_event_loop = None # Will be initialized if not in debug mode +ble_connection_status = False + +# ================================================================================================= +# BLE HELPER FUNCTIONS (Used in LIVE mode) +# ================================================================================================= + +lampAmount = 25 + +def create_spiral_map(n=5): + if n % 2 == 0: + raise ValueError("Matrix size must be odd for a unique center point.") + spiral_map = [[0] * n for _ in range(n)] + r, c = n // 2, n // 2 + address = 0 + spiral_map[r][c] = address + + # Updated directions to start moving UP first instead of right + dr = [-1, 0, 1, 0] # Change in row: Up, Right, Down, Left + dc = [0, 1, 0, -1] # Change in col: Up, Right, Down, Left + + direction = 0 + segment_length = 1 + steps = 0 + while address < n * n - 1: + for _ in range(segment_length): + address += 1 + r += dr[direction] + c += dc[direction] + if 0 <= r < n and 0 <= c < n: + spiral_map[r][c] = address + direction = (direction + 1) % 4 + steps += 1 + if steps % 2 == 0: + segment_length += 1 + return spiral_map + + +def get_spiral_address(row, col, spiral_map): + n = len(spiral_map) + if 0 <= row < n and 0 <= col < n: + return spiral_map[row][col] + else: + return -1 + +SPIRAL_MAP_5x5 = create_spiral_map(5) + +async def set_full_matrix_on_ble(colorSeries): + global ble_client + global ble_characteristics + global ble_connection_status + + if not ble_client or not ble_client.is_connected: + print("BLE client not connected. Attempting to reconnect...") + await connect_to_ble_device() + if not ble_client or not ble_client.is_connected: + print("Failed to reconnect to BLE client.") + return + else: + print("Confirmed BLE connection status. Proceeding with lamp update.") + + # ===================================================================== + # SNIPPET TO PATCH SWAPPED LAMP POSITIONS + # ===================================================================== + #print("Patching lamp positions 3 <-> 7 and 12 <-> 24.") + + # Swap data for lamps at positions 3 and 7 + #temp_color_3 = colorSeries[3] + #colorSeries[3] = colorSeries[7] + #colorSeries[7] = temp_color_3 + + # Swap data for lamps at positions 12 and 24 + #temp_color_12 = colorSeries[12] + #colorSeries[12] = colorSeries[24] + #colorSeries[24] = temp_color_12 + # ===================================================================== + + if DEBUG_MODE: + # Ensure all characteristics are available before writing + print(f"Confirmed DEBUG set to true.") + if len(ble_characteristics) != lampAmount: + print(f"Mismatch in lamp amount. Expected {lampAmount}, got {len(ble_characteristics)}.") + return + print(f"Constructed the following matrix data: {colorSeries}") + # Write each byte string to its corresponding characteristic + for i, char in enumerate(ble_characteristics): + value_to_write = colorSeries[i] + print(f"Setting Lamp {i} ({char.uuid}) to {value_to_write.hex()}") + await ble_client.write_gatt_char(char.uuid, value_to_write) + else: + print(f"Confirmed DEBUG set to false.") + value_to_write = b"".join([color for color in colorSeries]) + print(value_to_write) + print(f"Setting lamps to {value_to_write.hex()}") + await ble_client.write_gatt_char(ble_characteristics[0].uuid, value_to_write) + + +async def connect_to_ble_device(): + global ble_client + global ble_characteristics + global ble_connection_status + + print(f"Scanning for device: {DEVICE_NAME}...") + devices = await BleakScanner.discover() + target_device = next((d for d in devices if d.name == DEVICE_NAME), None) + + if not target_device: + print(f"Device '{DEVICE_NAME}' not found.") + ble_connection_status = False + return False + + print(f"Found device: {target_device.name} ({target_device.address})") + + try: + ble_client = BleakClient(target_device.address) + await ble_client.connect() + if ble_client.is_connected: + print(f"Connected to {target_device.name}") + services = [service for service in ble_client.services if service.handle != 1] + + # The previous logic for filtering services seems incorrect; let's grab all characteristics + characteristics = [ + char for service in services for char in service.characteristics + ] + ble_characteristics = sorted(characteristics, key=lambda char: char.handle) + print(f"Found {len(ble_characteristics)} characteristics for lamps.") + ble_connection_status = True + return True + else: + print(f"Failed to connect to {target_device.name}") + ble_connection_status = False + return False + except Exception as e: + print(f"An error occurred during BLE connection: {e}") + ble_connection_status = False + return False +# ================================================================================================= +# COLOR MIXING +# ================================================================================================= + +def calculate_rgb(ww, cw, blue): + """ + Calculates the combined RGB color from warm white, cool white, and blue light values. + This function is a Python equivalent of the JavaScript color mixer in index.html. + """ + # Define the RGB components for each light source based on slider track colors + warm_white_r, warm_white_g, warm_white_b = 255, 192, 128 + cool_white_r, cool_white_g, cool_white_b = 192, 224, 255 + blue_r, blue_g, blue_b = 0, 0, 255 + + # Normalize the slider values (0-255) and apply them to the base colors + r = (ww / 255) * warm_white_r + (cw / 255) * cool_white_r + (blue / 255) * blue_r + g = (ww / 255) * warm_white_g + (cw / 255) * cool_white_g + (blue / 255) * blue_g + b = (ww / 255) * warm_white_b + (cw / 255) * cool_white_b + (blue / 255) * blue_b + + # Clamp the values to 255 and convert to integer + r = int(min(255, round(r))) + g = int(min(255, round(g))) + b = int(min(255, round(b))) + + return r, g, b + +def rgb_to_hex(r, g, b): + """ + Converts RGB color values to a hex color string. + """ + # Ensure values are within the valid range (0-255) and are integers + r = int(max(0, min(255, r))) + g = int(max(0, min(255, g))) + b = int(max(0, min(255, b))) + + # Convert each component to a two-digit hexadecimal string + return f'#{r:02x}{g:02x}{b:02x}' + +# ================================================================================================= +# FLASK APPLICATION +# ================================================================================================= + +app = Flask(__name__) + +# In-memory matrix for DEBUG_MODE +lamp_matrix = [['#000000' for _ in range(5)] for _ in range(5)] + +@app.route('/') +def index(): + print(f"Getting current lamp matrix info: {lamp_matrix}") + if DEBUG_MODE: + return render_template('index.html', matrix=lamp_matrix) + else: + # In live mode, we'll pass a default black matrix. + initial_matrix = [['#000000' for _ in range(5)] for _ in range(5)] + return render_template('index.html', matrix=initial_matrix) + + +@app.route('/set_matrix', methods=['POST']) +def set_matrix(): + data = request.get_json() + full_matrix = data.get('matrix', []) + matrixDataSerialized = [] + if not full_matrix or len(full_matrix) != 5 or len(full_matrix[0]) != 5: + return jsonify(success=False, message="Invalid matrix data received"), 400 + else: + print(f"Received the following matrix data: {full_matrix}") + + #Creating empty byte array + serial_colors = [b'\x00\x00\x00'] * lampAmount + + try: + for row in range(5): + for col in range(5): + lamp_data = full_matrix[row][col] + ww = int(lamp_data['ww']) + cw = int(lamp_data['cw']) + blue = int(lamp_data['blue']) + + #Preparing byte data for control command + color_bytes = bytes([ww, cw, blue]) + spiral_pos = get_spiral_address(row, col, SPIRAL_MAP_5x5) + print(f"Constructed data for {spiral_pos}: {color_bytes}") + if spiral_pos != -1: + serial_colors[spiral_pos] = color_bytes + #Preparing hex color data for frontend + lampColorR, lampColorG, lampColorB = calculate_rgb(ww,cw,blue) + lamp_matrix[row][col] = rgb_to_hex(lampColorR, lampColorG, lampColorB) + if DEBUG_MODE: + # === DEBUG MODE: Update in-memory matrix === + return jsonify(success=True) + else: + # === LIVE MODE: Communicate with the BLE device === + asyncio.run_coroutine_threadsafe( + set_full_matrix_on_ble(serial_colors), + ble_event_loop + ) + return jsonify(success=True) + except Exception as e: + print(f"Error in set_matrix route: {e}") + return jsonify(success=False, message=str(e)), 500 + + print(f"Getting current lamp matrix info: {lamp_matrix}") + +@app.route('/ble_status') +def ble_status(): + global ble_connection_status + if DEBUG_MODE: + return jsonify(connected=True) + return jsonify(connected=ble_connection_status) + +@app.route('/vision/pupil_data') +def get_pupil_data(): + """ + Endpoint to get the latest pupil segmentation data from the vision system. + """ + if vision_system: + data = vision_system.get_pupil_data() + return jsonify(success=True, data=data) + return jsonify(success=False, message="Vision system not initialized"), 500 + +# ================================================================================================= +# APP STARTUP +# ================================================================================================= + +vision_system = None + +def signal_handler(signum, frame): + print("Received shutdown signal, gracefully shutting down...") + global ble_connection_status + + # Stop the vision system + if vision_system: + print("Stopping vision system...") + vision_system.stop() + print("Vision system stopped.") + + if not DEBUG_MODE and ble_client and ble_client.is_connected: + print("Disconnecting BLE client...") + ble_connection_status = False + disconnect_future = asyncio.run_coroutine_threadsafe(ble_client.disconnect(), ble_event_loop) + try: + # Wait for the disconnect to complete with a timeout + disconnect_future.result(timeout=5) + print("BLE client disconnected successfully.") + except Exception as e: + print(f"Error during BLE disconnect: {e}") + + if not DEBUG_MODE and ble_event_loop and ble_event_loop.is_running(): + print("Stopping BLE event loop...") + # Schedule a stop and wait for the thread to finish + ble_event_loop.call_soon_threadsafe(ble_event_loop.stop) + ble_thread.join(timeout=1) + print("BLE event loop stopped.") + + os._exit(0) + +if __name__ == '__main__': + # Register the signal handler before running the app + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Initialize and start the Vision System + try: + vision_config = {"camera_id": 0, "model_path": "yolov10.onnx"} + vision_system = VisionSystem(config=vision_config) + vision_system.start() + except Exception as e: + print(f"Failed to initialize or start Vision System: {e}") + vision_system = None + + + if not DEBUG_MODE: + print("Starting BLE event loop in background thread...") + ble_event_loop = asyncio.new_event_loop() + ble_thread = threading.Thread(target=ble_event_loop.run_forever, daemon=True) + ble_thread.start() + + # Connect to the device as soon as the app starts + future = asyncio.run_coroutine_threadsafe(connect_to_ble_device(), ble_event_loop) + future.result(timeout=10) # Wait up to 10 seconds for connection + + app.run(debug=True, use_reloader=False, host="0.0.0.0") \ No newline at end of file