Resolve merge conflict in app.py and integrate vision system

This commit is contained in:
Tempest 2025-11-27 22:23:57 +07:00
commit 8aebeea6ee

View File

@ -1,343 +1,345 @@
from flask import Flask, render_template, request, jsonify from flask import Flask, render_template, request, jsonify
import asyncio import asyncio
from bleak import BleakScanner, BleakClient from bleak import BleakScanner, BleakClient
import threading import threading
import time import time
import json import json
import sys import sys
import signal import signal
import os import os
from vision import VisionSystem from vision import VisionSystem
# ================================================================================================= # =================================================================================================
# APP CONFIGURATION # APP CONFIGURATION
# ================================================================================================= # =================================================================================================
# Set to True to run without a physical BLE device for testing purposes. # Set to True to run without a physical BLE device for testing purposes.
# Set to False to connect to the actual lamp matrix. # Set to False to connect to the actual lamp matrix.
DEBUG_MODE = True DEBUG_MODE = True
# --- BLE Device Configuration (Ignored in DEBUG_MODE) --- # --- BLE Device Configuration (Ignored in DEBUG_MODE) ---
DEVICE_NAME = "Pupilometer LED Billboard" DEVICE_NAME = "Pupilometer LED Billboard"
global ble_client global ble_client
global ble_characteristics global ble_characteristics
ble_client = None global ble_connection_status
ble_characteristics = None ble_client = None
ble_event_loop = None # Will be initialized if not in debug mode ble_characteristics = None
ble_connection_status = False ble_event_loop = None # Will be initialized if not in debug mode
ble_connection_status = False
# =================================================================================================
# BLE HELPER FUNCTIONS (Used in LIVE mode) # =================================================================================================
# ================================================================================================= # BLE HELPER FUNCTIONS (Used in LIVE mode)
# =================================================================================================
lampAmount = 25
lampAmount = 25
def create_spiral_map(n=5):
if n % 2 == 0: def create_spiral_map(n=5):
raise ValueError("Matrix size must be odd for a unique center point.") if n % 2 == 0:
spiral_map = [[0] * n for _ in range(n)] raise ValueError("Matrix size must be odd for a unique center point.")
r, c = n // 2, n // 2 spiral_map = [[0] * n for _ in range(n)]
address = 0 r, c = n // 2, n // 2
spiral_map[r][c] = address address = 0
spiral_map[r][c] = address
# Updated directions to start moving UP first instead of right
dr = [-1, 0, 1, 0] # Change in row: Up, Right, Down, Left # Updated directions to start moving UP first instead of right
dc = [0, 1, 0, -1] # Change in col: Up, Right, Down, Left dr = [-1, 0, 1, 0] # Change in row: Up, Right, Down, Left
dc = [0, 1, 0, -1] # Change in col: Up, Right, Down, Left
direction = 0
segment_length = 1 direction = 0
steps = 0 segment_length = 1
while address < n * n - 1: steps = 0
for _ in range(segment_length): while address < n * n - 1:
address += 1 for _ in range(segment_length):
r += dr[direction] address += 1
c += dc[direction] r += dr[direction]
if 0 <= r < n and 0 <= c < n: c += dc[direction]
spiral_map[r][c] = address if 0 <= r < n and 0 <= c < n:
direction = (direction + 1) % 4 spiral_map[r][c] = address
steps += 1 direction = (direction + 1) % 4
if steps % 2 == 0: steps += 1
segment_length += 1 if steps % 2 == 0:
return spiral_map segment_length += 1
return spiral_map
def get_spiral_address(row, col, spiral_map):
n = len(spiral_map) def get_spiral_address(row, col, spiral_map):
if 0 <= row < n and 0 <= col < n: n = len(spiral_map)
return spiral_map[row][col] if 0 <= row < n and 0 <= col < n:
else: return spiral_map[row][col]
return -1 else:
return -1
SPIRAL_MAP_5x5 = create_spiral_map(5)
SPIRAL_MAP_5x5 = create_spiral_map(5)
async def set_full_matrix_on_ble(colorSeries):
global ble_client async def set_full_matrix_on_ble(colorSeries):
global ble_characteristics global ble_client
global ble_characteristics
if not ble_client or not ble_client.is_connected: global ble_connection_status
print("BLE client not connected. Attempting to reconnect...")
await connect_to_ble_device() if not ble_client or not ble_client.is_connected:
if not ble_client or not ble_client.is_connected: print("BLE client not connected. Attempting to reconnect...")
print("Failed to reconnect to BLE client.") await connect_to_ble_device()
return if not ble_client or not ble_client.is_connected:
else: print("Failed to reconnect to BLE client.")
print("Confirmed BLE connection status. Proceeding with lamp update.") return
else:
# ===================================================================== print("Confirmed BLE connection status. Proceeding with lamp update.")
# SNIPPET TO PATCH SWAPPED LAMP POSITIONS
# ===================================================================== # =====================================================================
print("Patching lamp positions 3 <-> 7 and 12 <-> 24.") # SNIPPET TO PATCH SWAPPED LAMP POSITIONS
# =====================================================================
# Swap data for lamps at positions 3 and 7 #print("Patching lamp positions 3 <-> 7 and 12 <-> 24.")
temp_color_3 = colorSeries[3]
colorSeries[3] = colorSeries[7] # Swap data for lamps at positions 3 and 7
colorSeries[7] = temp_color_3 #temp_color_3 = colorSeries[3]
#colorSeries[3] = colorSeries[7]
# Swap data for lamps at positions 12 and 24 #colorSeries[7] = temp_color_3
temp_color_12 = colorSeries[12]
colorSeries[12] = colorSeries[24] # Swap data for lamps at positions 12 and 24
colorSeries[24] = temp_color_12 #temp_color_12 = colorSeries[12]
# ===================================================================== #colorSeries[12] = colorSeries[24]
#colorSeries[24] = temp_color_12
if DEBUG_MODE: # =====================================================================
# Ensure all characteristics are available before writing
print(f"Confirmed DEBUG set to true.") if DEBUG_MODE:
if len(ble_characteristics) != lampAmount: # Ensure all characteristics are available before writing
print(f"Mismatch in lamp amount. Expected {lampAmount}, got {len(ble_characteristics)}.") print(f"Confirmed DEBUG set to true.")
return if len(ble_characteristics) != lampAmount:
print(f"Constructed the following matrix data: {colorSeries}") print(f"Mismatch in lamp amount. Expected {lampAmount}, got {len(ble_characteristics)}.")
# Write each byte string to its corresponding characteristic return
for i, char in enumerate(ble_characteristics): print(f"Constructed the following matrix data: {colorSeries}")
value_to_write = colorSeries[i] # Write each byte string to its corresponding characteristic
print(f"Setting Lamp {i} ({char.uuid}) to {value_to_write.hex()}") for i, char in enumerate(ble_characteristics):
await ble_client.write_gatt_char(char.uuid, value_to_write) value_to_write = colorSeries[i]
else: print(f"Setting Lamp {i} ({char.uuid}) to {value_to_write.hex()}")
print(f"Confirmed DEBUG set to false.") await ble_client.write_gatt_char(char.uuid, value_to_write)
value_to_write = b"".join([color for color in colorSeries]) else:
print(value_to_write) print(f"Confirmed DEBUG set to false.")
print(f"Setting lamps to {value_to_write.hex()}") value_to_write = b"".join([color for color in colorSeries])
await ble_client.write_gatt_char(ble_characteristics[0].uuid, value_to_write) print(value_to_write)
print(f"Setting lamps to {value_to_write.hex()}")
await ble_client.write_gatt_char(ble_characteristics[0].uuid, value_to_write)
async def connect_to_ble_device():
global ble_client
global ble_characteristics async def connect_to_ble_device():
global ble_connection_status global ble_client
global ble_characteristics
print(f"Scanning for device: {DEVICE_NAME}...") global ble_connection_status
devices = await BleakScanner.discover()
target_device = next((d for d in devices if d.name == DEVICE_NAME), None) print(f"Scanning for device: {DEVICE_NAME}...")
devices = await BleakScanner.discover()
if not target_device: target_device = next((d for d in devices if d.name == DEVICE_NAME), None)
print(f"Device '{DEVICE_NAME}' not found.")
ble_connection_status = False if not target_device:
return False print(f"Device '{DEVICE_NAME}' not found.")
ble_connection_status = False
print(f"Found device: {target_device.name} ({target_device.address})") return False
try: print(f"Found device: {target_device.name} ({target_device.address})")
ble_client = BleakClient(target_device.address)
await ble_client.connect() try:
if ble_client.is_connected: ble_client = BleakClient(target_device.address)
print(f"Connected to {target_device.name}") await ble_client.connect()
services = [service for service in ble_client.services if service.handle != 1] if ble_client.is_connected:
print(f"Connected to {target_device.name}")
# The previous logic for filtering services seems incorrect; let's grab all characteristics services = [service for service in ble_client.services if service.handle != 1]
characteristics = [
char for service in services for char in service.characteristics # The previous logic for filtering services seems incorrect; let's grab all characteristics
] characteristics = [
ble_characteristics = sorted(characteristics, key=lambda char: char.handle) char for service in services for char in service.characteristics
print(f"Found {len(ble_characteristics)} characteristics for lamps.") ]
ble_connection_status = True ble_characteristics = sorted(characteristics, key=lambda char: char.handle)
return True print(f"Found {len(ble_characteristics)} characteristics for lamps.")
else: ble_connection_status = True
print(f"Failed to connect to {target_device.name}") return True
ble_connection_status = False else:
return False print(f"Failed to connect to {target_device.name}")
except Exception as e: ble_connection_status = False
print(f"An error occurred during BLE connection: {e}") return False
ble_connection_status = False except Exception as e:
return False print(f"An error occurred during BLE connection: {e}")
# ================================================================================================= ble_connection_status = False
# COLOR MIXING return False
# ================================================================================================= # =================================================================================================
# COLOR MIXING
def calculate_rgb(ww, cw, blue): # =================================================================================================
"""
Calculates the combined RGB color from warm white, cool white, and blue light values. def calculate_rgb(ww, cw, blue):
This function is a Python equivalent of the JavaScript color mixer in index.html. """
""" Calculates the combined RGB color from warm white, cool white, and blue light values.
# Define the RGB components for each light source based on slider track colors This function is a Python equivalent of the JavaScript color mixer in index.html.
warm_white_r, warm_white_g, warm_white_b = 255, 192, 128 """
cool_white_r, cool_white_g, cool_white_b = 192, 224, 255 # Define the RGB components for each light source based on slider track colors
blue_r, blue_g, blue_b = 0, 0, 255 warm_white_r, warm_white_g, warm_white_b = 255, 192, 128
cool_white_r, cool_white_g, cool_white_b = 192, 224, 255
# Normalize the slider values (0-255) and apply them to the base colors blue_r, blue_g, blue_b = 0, 0, 255
r = (ww / 255) * warm_white_r + (cw / 255) * cool_white_r + (blue / 255) * blue_r
g = (ww / 255) * warm_white_g + (cw / 255) * cool_white_g + (blue / 255) * blue_g # Normalize the slider values (0-255) and apply them to the base colors
b = (ww / 255) * warm_white_b + (cw / 255) * cool_white_b + (blue / 255) * blue_b r = (ww / 255) * warm_white_r + (cw / 255) * cool_white_r + (blue / 255) * blue_r
g = (ww / 255) * warm_white_g + (cw / 255) * cool_white_g + (blue / 255) * blue_g
# Clamp the values to 255 and convert to integer b = (ww / 255) * warm_white_b + (cw / 255) * cool_white_b + (blue / 255) * blue_b
r = int(min(255, round(r)))
g = int(min(255, round(g))) # Clamp the values to 255 and convert to integer
b = int(min(255, round(b))) r = int(min(255, round(r)))
g = int(min(255, round(g)))
return r, g, b b = int(min(255, round(b)))
def rgb_to_hex(r, g, b): return r, g, b
"""
Converts RGB color values to a hex color string. def rgb_to_hex(r, g, b):
""" """
# Ensure values are within the valid range (0-255) and are integers Converts RGB color values to a hex color string.
r = int(max(0, min(255, r))) """
g = int(max(0, min(255, g))) # Ensure values are within the valid range (0-255) and are integers
b = int(max(0, min(255, b))) r = int(max(0, min(255, r)))
g = int(max(0, min(255, g)))
# Convert each component to a two-digit hexadecimal string b = int(max(0, min(255, b)))
return f'#{r:02x}{g:02x}{b:02x}'
# Convert each component to a two-digit hexadecimal string
# ================================================================================================= return f'#{r:02x}{g:02x}{b:02x}'
# FLASK APPLICATION
# ================================================================================================= # =================================================================================================
# FLASK APPLICATION
app = Flask(__name__) # =================================================================================================
# In-memory matrix for DEBUG_MODE app = Flask(__name__)
lamp_matrix = [['#000000' for _ in range(5)] for _ in range(5)]
# In-memory matrix for DEBUG_MODE
@app.route('/') lamp_matrix = [['#000000' for _ in range(5)] for _ in range(5)]
def index():
print(f"Getting current lamp matrix info: {lamp_matrix}") @app.route('/')
if DEBUG_MODE: def index():
return render_template('index.html', matrix=lamp_matrix) print(f"Getting current lamp matrix info: {lamp_matrix}")
else: if DEBUG_MODE:
# In live mode, we'll pass a default black matrix. return render_template('index.html', matrix=lamp_matrix)
initial_matrix = [['#000000' for _ in range(5)] for _ in range(5)] else:
return render_template('index.html', matrix=initial_matrix) # In live mode, we'll pass a default black matrix.
initial_matrix = [['#000000' for _ in range(5)] for _ in range(5)]
return render_template('index.html', matrix=initial_matrix)
@app.route('/set_matrix', methods=['POST'])
def set_matrix():
data = request.get_json() @app.route('/set_matrix', methods=['POST'])
full_matrix = data.get('matrix', []) def set_matrix():
matrixDataSerialized = [] data = request.get_json()
if not full_matrix or len(full_matrix) != 5 or len(full_matrix[0]) != 5: full_matrix = data.get('matrix', [])
return jsonify(success=False, message="Invalid matrix data received"), 400 matrixDataSerialized = []
else: if not full_matrix or len(full_matrix) != 5 or len(full_matrix[0]) != 5:
print(f"Received the following matrix data: {full_matrix}") return jsonify(success=False, message="Invalid matrix data received"), 400
else:
#Creating empty byte array print(f"Received the following matrix data: {full_matrix}")
serial_colors = [b'\x00\x00\x00'] * lampAmount
#Creating empty byte array
try: serial_colors = [b'\x00\x00\x00'] * lampAmount
for row in range(5):
for col in range(5): try:
lamp_data = full_matrix[row][col] for row in range(5):
ww = int(lamp_data['ww']) for col in range(5):
cw = int(lamp_data['cw']) lamp_data = full_matrix[row][col]
blue = int(lamp_data['blue']) ww = int(lamp_data['ww'])
cw = int(lamp_data['cw'])
#Preparing byte data for control command blue = int(lamp_data['blue'])
color_bytes = bytes([ww, cw, blue])
spiral_pos = get_spiral_address(row, col, SPIRAL_MAP_5x5) #Preparing byte data for control command
print(f"Constructed data for {spiral_pos}: {color_bytes}") color_bytes = bytes([ww, cw, blue])
if spiral_pos != -1: spiral_pos = get_spiral_address(row, col, SPIRAL_MAP_5x5)
serial_colors[spiral_pos] = color_bytes print(f"Constructed data for {spiral_pos}: {color_bytes}")
#Preparing hex color data for frontend if spiral_pos != -1:
lampColorR, lampColorG, lampColorB = calculate_rgb(ww,cw,blue) serial_colors[spiral_pos] = color_bytes
lamp_matrix[row][col] = rgb_to_hex(lampColorR, lampColorG, lampColorB) #Preparing hex color data for frontend
if DEBUG_MODE: lampColorR, lampColorG, lampColorB = calculate_rgb(ww,cw,blue)
# === DEBUG MODE: Update in-memory matrix === lamp_matrix[row][col] = rgb_to_hex(lampColorR, lampColorG, lampColorB)
return jsonify(success=True) if DEBUG_MODE:
else: # === DEBUG MODE: Update in-memory matrix ===
# === LIVE MODE: Communicate with the BLE device === return jsonify(success=True)
asyncio.run_coroutine_threadsafe( else:
set_full_matrix_on_ble(serial_colors), # === LIVE MODE: Communicate with the BLE device ===
ble_event_loop asyncio.run_coroutine_threadsafe(
) set_full_matrix_on_ble(serial_colors),
return jsonify(success=True) ble_event_loop
except Exception as e: )
print(f"Error in set_matrix route: {e}") return jsonify(success=True)
return jsonify(success=False, message=str(e)), 500 except Exception as e:
print(f"Error in set_matrix route: {e}")
print(f"Getting current lamp matrix info: {lamp_matrix}") return jsonify(success=False, message=str(e)), 500
@app.route('/ble_status') print(f"Getting current lamp matrix info: {lamp_matrix}")
def ble_status():
global ble_connection_status @app.route('/ble_status')
if DEBUG_MODE: def ble_status():
return jsonify(connected=True) global ble_connection_status
return jsonify(connected=ble_connection_status) if DEBUG_MODE:
return jsonify(connected=True)
@app.route('/vision/pupil_data') return jsonify(connected=ble_connection_status)
def get_pupil_data():
""" @app.route('/vision/pupil_data')
Endpoint to get the latest pupil segmentation data from the vision system. def get_pupil_data():
""" """
if vision_system: Endpoint to get the latest pupil segmentation data from the vision system.
data = vision_system.get_pupil_data() """
return jsonify(success=True, data=data) if vision_system:
return jsonify(success=False, message="Vision system not initialized"), 500 data = vision_system.get_pupil_data()
return jsonify(success=True, data=data)
# ================================================================================================= return jsonify(success=False, message="Vision system not initialized"), 500
# APP STARTUP
# ================================================================================================= # =================================================================================================
# APP STARTUP
vision_system = None # =================================================================================================
def signal_handler(signum, frame): vision_system = None
print("Received shutdown signal, gracefully shutting down...")
global ble_connection_status def signal_handler(signum, frame):
print("Received shutdown signal, gracefully shutting down...")
# Stop the vision system global ble_connection_status
if vision_system:
print("Stopping vision system...") # Stop the vision system
vision_system.stop() if vision_system:
print("Vision system stopped.") print("Stopping vision system...")
vision_system.stop()
if not DEBUG_MODE and ble_client and ble_client.is_connected: print("Vision system stopped.")
print("Disconnecting BLE client...")
ble_connection_status = False if not DEBUG_MODE and ble_client and ble_client.is_connected:
disconnect_future = asyncio.run_coroutine_threadsafe(ble_client.disconnect(), ble_event_loop) print("Disconnecting BLE client...")
try: ble_connection_status = False
# Wait for the disconnect to complete with a timeout disconnect_future = asyncio.run_coroutine_threadsafe(ble_client.disconnect(), ble_event_loop)
disconnect_future.result(timeout=5) try:
print("BLE client disconnected successfully.") # Wait for the disconnect to complete with a timeout
except Exception as e: disconnect_future.result(timeout=5)
print(f"Error during BLE disconnect: {e}") print("BLE client disconnected successfully.")
except Exception as e:
if not DEBUG_MODE and ble_event_loop and ble_event_loop.is_running(): print(f"Error during BLE disconnect: {e}")
print("Stopping BLE event loop...")
# Schedule a stop and wait for the thread to finish if not DEBUG_MODE and ble_event_loop and ble_event_loop.is_running():
ble_event_loop.call_soon_threadsafe(ble_event_loop.stop) print("Stopping BLE event loop...")
ble_thread.join(timeout=1) # Schedule a stop and wait for the thread to finish
print("BLE event loop stopped.") ble_event_loop.call_soon_threadsafe(ble_event_loop.stop)
ble_thread.join(timeout=1)
os._exit(0) print("BLE event loop stopped.")
if __name__ == '__main__': os._exit(0)
# Register the signal handler before running the app
signal.signal(signal.SIGINT, signal_handler) if __name__ == '__main__':
signal.signal(signal.SIGTERM, signal_handler) # Register the signal handler before running the app
signal.signal(signal.SIGINT, signal_handler)
# Initialize and start the Vision System signal.signal(signal.SIGTERM, signal_handler)
try:
vision_config = {"camera_id": 0, "model_path": "yolov10.onnx"} # Initialize and start the Vision System
vision_system = VisionSystem(config=vision_config) try:
vision_system.start() vision_config = {"camera_id": 0, "model_path": "yolov10.onnx"}
except Exception as e: vision_system = VisionSystem(config=vision_config)
print(f"Failed to initialize or start Vision System: {e}") vision_system.start()
vision_system = None except Exception as e:
print(f"Failed to initialize or start Vision System: {e}")
vision_system = None
if not DEBUG_MODE:
print("Starting BLE event loop in background thread...")
ble_event_loop = asyncio.new_event_loop() if not DEBUG_MODE:
ble_thread = threading.Thread(target=ble_event_loop.run_forever, daemon=True) print("Starting BLE event loop in background thread...")
ble_thread.start() ble_event_loop = asyncio.new_event_loop()
ble_thread = threading.Thread(target=ble_event_loop.run_forever, daemon=True)
# Connect to the device as soon as the app starts ble_thread.start()
future = asyncio.run_coroutine_threadsafe(connect_to_ble_device(), ble_event_loop)
future.result(timeout=10) # Wait up to 10 seconds for connection # Connect to the device as soon as the app starts
future = asyncio.run_coroutine_threadsafe(connect_to_ble_device(), ble_event_loop)
app.run(debug=True, use_reloader=False, host="0.0.0.0") future.result(timeout=10) # Wait up to 10 seconds for connection
app.run(debug=True, use_reloader=False, host="0.0.0.0")