pupilometer/src/controllerSoftware/app.py
2025-08-07 18:17:29 +07:00

237 lines
8.6 KiB
Python

from flask import Flask, render_template, request, jsonify
import asyncio
from bleak import BleakScanner, BleakClient
import threading
import time
import json
# =================================================================================================
# APP CONFIGURATION
# =================================================================================================
# Set to True to run without a physical BLE device for testing purposes.
# Set to False to connect to the actual lamp matrix.
DEBUG_MODE = False
# --- BLE Device Configuration (Ignored in DEBUG_MODE) ---
DEVICE_NAME = "Pupilometer LED Billboard"
global ble_client
global ble_characteristics
ble_client = None
ble_characteristics = None
ble_event_loop = None # Will be initialized if not in debug mode
# =================================================================================================
# BLE HELPER FUNCTIONS (Used in LIVE mode)
# =================================================================================================
lampAmount = 25
def create_spiral_map(n=5):
if n % 2 == 0:
raise ValueError("Matrix size must be odd for a unique center point.")
spiral_map = [[0] * n for _ in range(n)]
r, c = n // 2, n // 2
address = 0
spiral_map[r][c] = address
# Updated directions to start moving UP first instead of right
dr = [-1, 0, 1, 0] # Change in row: Up, Right, Down, Left
dc = [0, 1, 0, -1] # Change in col: Up, Right, Down, Left
direction = 0
segment_length = 1
steps = 0
while address < n * n - 1:
for _ in range(segment_length):
address += 1
r += dr[direction]
c += dc[direction]
if 0 <= r < n and 0 <= c < n:
spiral_map[r][c] = address
direction = (direction + 1) % 4
steps += 1
if steps % 2 == 0:
segment_length += 1
return spiral_map
def get_spiral_address(row, col, spiral_map):
n = len(spiral_map)
if 0 <= row < n and 0 <= col < n:
return spiral_map[row][col]
else:
return -1
SPIRAL_MAP_5x5 = create_spiral_map(5)
async def set_full_matrix_on_ble(full_matrix):
global ble_client
global ble_characteristics
if not ble_client or not ble_client.is_connected:
print("BLE client not connected. Attempting to reconnect...")
await connect_to_ble_device()
if not ble_client or not ble_client.is_connected:
print("Failed to reconnect to BLE client.")
return
else:
print("Confirmed BLE connection status. Proceeding with lamp update.")
print("Initializing blank canvas...")
serial_colors = [b'\x00\x00\x00'] * lampAmount
print(f"Initialized: {serial_colors}")
for row in range(5):
for col in range(5):
lamp_data = full_matrix[row][col]
print(f"Construcing lamp {row},{col} data: {lamp_data}")
ww = int(lamp_data['ww'])
cw = int(lamp_data['cw'])
blue = int(lamp_data['blue'])
color_bytes = bytes([ww, cw, blue])
spiral_pos = get_spiral_address(row, col, SPIRAL_MAP_5x5)
print(f"Constructed data for {spiral_pos}: {color_bytes}")
if spiral_pos != -1:
serial_colors[spiral_pos] = color_bytes
# =====================================================================
# SNIPPET TO PATCH SWAPPED LAMP POSITIONS
# =====================================================================
print("Patching lamp positions 3 <-> 7 and 12 <-> 24.")
# Swap data for lamps at positions 3 and 7
temp_color_3 = serial_colors[3]
serial_colors[3] = serial_colors[7]
serial_colors[7] = temp_color_3
# Swap data for lamps at positions 12 and 24
temp_color_12 = serial_colors[12]
serial_colors[12] = serial_colors[24]
serial_colors[24] = temp_color_12
# =====================================================================
# Ensure all characteristics are available before writing
if len(ble_characteristics) != lampAmount:
print(f"Mismatch in lamp amount. Expected {lampAmount}, got {len(ble_characteristics)}.")
return
print(f"Constructed the following matrix data: {serial_colors}")
# Write each byte string to its corresponding characteristic
for i, char in enumerate(ble_characteristics):
value_to_write = serial_colors[i]
print(f"Setting Lamp {i} ({char.uuid}) to {value_to_write.hex()}")
await ble_client.write_gatt_char(char.uuid, value_to_write)
async def connect_to_ble_device():
global ble_client
global ble_characteristics
print(f"Scanning for device: {DEVICE_NAME}...")
devices = await BleakScanner.discover()
target_device = next((d for d in devices if d.name == DEVICE_NAME), None)
if not target_device:
print(f"Device '{DEVICE_NAME}' not found.")
return False
print(f"Found device: {target_device.name} ({target_device.address})")
try:
ble_client = BleakClient(target_device.address)
await ble_client.connect()
if ble_client.is_connected:
print(f"Connected to {target_device.name}")
services = ble_client.services
# The previous logic for filtering services seems incorrect; let's grab all characteristics
characteristics = [
char for service in services for char in service.characteristics
]
ble_characteristics = sorted(characteristics, key=lambda char: char.handle)
print(f"Found {len(ble_characteristics)} characteristics for lamps.")
return True
else:
print(f"Failed to connect to {target_device.name}")
return False
except Exception as e:
print(f"An error occurred during BLE connection: {e}")
return False
# =================================================================================================
# FLASK APPLICATION
# =================================================================================================
app = Flask(__name__)
# In-memory matrix for DEBUG_MODE
lamp_matrix = [['#000000' for _ in range(5)] for _ in range(5)]
@app.route('/')
def index():
if DEBUG_MODE:
return render_template('index.html', matrix=lamp_matrix)
else:
# In live mode, we'll pass a default black matrix.
initial_matrix = [['#000000' for _ in range(5)] for _ in range(5)]
return render_template('index.html', matrix=initial_matrix)
@app.route('/set_matrix', methods=['POST'])
def set_matrix():
data = request.get_json()
full_matrix = data.get('matrix', [])
if not full_matrix or len(full_matrix) != 5 or len(full_matrix[0]) != 5:
return jsonify(success=False, message="Invalid matrix data received"), 400
else:
print(f"Received the following matrix data: {full_matrix}")
try:
if DEBUG_MODE:
# === DEBUG MODE: Update in-memory matrix ===
for row in range(5):
for col in range(5):
lamp_data = full_matrix[row][col]
ww = lamp_data['ww']
cw = lamp_data['cw']
blue = lamp_data['blue']
# Convert ww, cw, blue to a hex color for UI display
r = min(255, ww + cw)
g = min(255, ww + cw + blue)
b = min(255, blue + cw)
new_color_hex = f'#{r:02x}{g:02x}{b:02x}'
lamp_matrix[row][col] = new_color_hex
return jsonify(success=True)
else:
# === LIVE MODE: Communicate with the BLE device ===
asyncio.run_coroutine_threadsafe(
set_full_matrix_on_ble(full_matrix),
ble_event_loop
)
return jsonify(success=True)
except Exception as e:
print(f"Error in set_matrix route: {e}")
return jsonify(success=False, message=str(e)), 500
# =================================================================================================
# APP STARTUP
# =================================================================================================
if __name__ == '__main__':
if not DEBUG_MODE:
print("Starting BLE event loop in background thread...")
ble_event_loop = asyncio.new_event_loop()
ble_thread = threading.Thread(target=ble_event_loop.run_forever, daemon=True)
ble_thread.start()
# Connect to the device as soon as the app starts
future = asyncio.run_coroutine_threadsafe(connect_to_ble_device(), ble_event_loop)
future.result(timeout=10) # Wait up to 10 seconds for connection
app.run(debug=True, use_reloader=False)