Add new files and update existing ones

This commit is contained in:
Tempest 2025-11-27 22:22:56 +07:00
parent 77a4eca751
commit 60fa88926f
16 changed files with 2053 additions and 1197 deletions

11
.gitignore vendored Normal file
View File

@ -0,0 +1,11 @@
# Virtual Environment
.venv/
# Python cache
__pycache__/
*.pyc
# Test artifacts
app_stdout.log
app_stderr.log
screenshots/

View File

@ -1,5 +1,28 @@
### Pupilometer ### Pupilometer
## Introduction ## Introduction
This repository houses programs and documents related to Pupilometer project by Vietnam Academy of Science and Technology. The project aims to introduce a benchmark and researches into the interaction between light intensity and temperature to the eye strain disorder. This repository houses programs and documents related to Pupilometer project by Vietnam Academy of Science and Technology. The project aims to introduce a benchmark and researches into the interaction between light intensity and temperature to the eye strain disorder.
## Dependencies
### Python Dependencies
The Python dependencies are listed in the `requirements.txt` file. You can install them using pip:
```bash
pip install -r requirements.txt
```
### NVIDIA DeepStream
For running the pupil segmentation on a Jetson Orin AGX or a Windows machine with an NVIDIA GPU, this project uses NVIDIA DeepStream. DeepStream is a complex dependency and cannot be installed via pip.
Please follow the official NVIDIA documentation to install DeepStream for your platform:
* **Jetson:** [DeepStream for Jetson](https://developer.nvidia.com/deepstream-sdk-jetson)
* **Windows:** [DeepStream for Windows](https://developer.nvidia.com/deepstream-sdk-windows)
You will also need to install GStreamer and the Python bindings (PyGObject). These are usually installed as part of the DeepStream installation.
Additionally, the `pyds` library, which provides Python bindings for DeepStream metadata structures, is required. This library is also included with the DeepStream SDK and may need to be installed manually.

View File

@ -1,2 +1,8 @@
bleak>="1.0.0" bleak>="1.0.0"
flask>="3.1.1" flask>="3.1.1"
pypylon>= "4.0.0"
onnxruntime>= "1.18.0"
opencv-python>= "4.9.0"
pytest>= "8.0.0"
pytest-playwright>= "0.4.0"
requests>= "2.31.0"

View File

@ -1,298 +1,343 @@
from flask import Flask, render_template, request, jsonify from flask import Flask, render_template, request, jsonify
import asyncio import asyncio
from bleak import BleakScanner, BleakClient from bleak import BleakScanner, BleakClient
import threading import threading
import time import time
import json import json
import sys import sys
import signal import signal
import os import os
from vision import VisionSystem
# =================================================================================================
# APP CONFIGURATION # =================================================================================================
# ================================================================================================= # APP CONFIGURATION
# =================================================================================================
# Set to True to run without a physical BLE device for testing purposes.
# Set to False to connect to the actual lamp matrix. # Set to True to run without a physical BLE device for testing purposes.
DEBUG_MODE = True # Set to False to connect to the actual lamp matrix.
DEBUG_MODE = True
# --- BLE Device Configuration (Ignored in DEBUG_MODE) ---
DEVICE_NAME = "Pupilometer LED Billboard" # --- BLE Device Configuration (Ignored in DEBUG_MODE) ---
global ble_client DEVICE_NAME = "Pupilometer LED Billboard"
global ble_characteristics global ble_client
ble_client = None global ble_characteristics
ble_characteristics = None ble_client = None
ble_event_loop = None # Will be initialized if not in debug mode ble_characteristics = None
ble_event_loop = None # Will be initialized if not in debug mode
# ================================================================================================= ble_connection_status = False
# BLE HELPER FUNCTIONS (Used in LIVE mode)
# ================================================================================================= # =================================================================================================
# BLE HELPER FUNCTIONS (Used in LIVE mode)
lampAmount = 25 # =================================================================================================
def create_spiral_map(n=5): lampAmount = 25
if n % 2 == 0:
raise ValueError("Matrix size must be odd for a unique center point.") def create_spiral_map(n=5):
spiral_map = [[0] * n for _ in range(n)] if n % 2 == 0:
r, c = n // 2, n // 2 raise ValueError("Matrix size must be odd for a unique center point.")
address = 0 spiral_map = [[0] * n for _ in range(n)]
spiral_map[r][c] = address r, c = n // 2, n // 2
address = 0
# Updated directions to start moving UP first instead of right spiral_map[r][c] = address
dr = [-1, 0, 1, 0] # Change in row: Up, Right, Down, Left
dc = [0, 1, 0, -1] # Change in col: Up, Right, Down, Left # Updated directions to start moving UP first instead of right
dr = [-1, 0, 1, 0] # Change in row: Up, Right, Down, Left
direction = 0 dc = [0, 1, 0, -1] # Change in col: Up, Right, Down, Left
segment_length = 1
steps = 0 direction = 0
while address < n * n - 1: segment_length = 1
for _ in range(segment_length): steps = 0
address += 1 while address < n * n - 1:
r += dr[direction] for _ in range(segment_length):
c += dc[direction] address += 1
if 0 <= r < n and 0 <= c < n: r += dr[direction]
spiral_map[r][c] = address c += dc[direction]
direction = (direction + 1) % 4 if 0 <= r < n and 0 <= c < n:
steps += 1 spiral_map[r][c] = address
if steps % 2 == 0: direction = (direction + 1) % 4
segment_length += 1 steps += 1
return spiral_map if steps % 2 == 0:
segment_length += 1
return spiral_map
def get_spiral_address(row, col, spiral_map):
n = len(spiral_map)
if 0 <= row < n and 0 <= col < n: def get_spiral_address(row, col, spiral_map):
return spiral_map[row][col] n = len(spiral_map)
else: if 0 <= row < n and 0 <= col < n:
return -1 return spiral_map[row][col]
else:
SPIRAL_MAP_5x5 = create_spiral_map(5) return -1
async def set_full_matrix_on_ble(colorSeries): SPIRAL_MAP_5x5 = create_spiral_map(5)
global ble_client
global ble_characteristics async def set_full_matrix_on_ble(colorSeries):
global ble_client
if not ble_client or not ble_client.is_connected: global ble_characteristics
print("BLE client not connected. Attempting to reconnect...")
await connect_to_ble_device() if not ble_client or not ble_client.is_connected:
if not ble_client or not ble_client.is_connected: print("BLE client not connected. Attempting to reconnect...")
print("Failed to reconnect to BLE client.") await connect_to_ble_device()
return if not ble_client or not ble_client.is_connected:
else: print("Failed to reconnect to BLE client.")
print("Confirmed BLE connection status. Proceeding with lamp update.") return
else:
# ===================================================================== print("Confirmed BLE connection status. Proceeding with lamp update.")
# SNIPPET TO PATCH SWAPPED LAMP POSITIONS
# ===================================================================== # =====================================================================
print("Patching lamp positions 3 <-> 7 and 12 <-> 24.") # SNIPPET TO PATCH SWAPPED LAMP POSITIONS
# =====================================================================
# Swap data for lamps at positions 3 and 7 print("Patching lamp positions 3 <-> 7 and 12 <-> 24.")
temp_color_3 = colorSeries[3]
colorSeries[3] = colorSeries[7] # Swap data for lamps at positions 3 and 7
colorSeries[7] = temp_color_3 temp_color_3 = colorSeries[3]
colorSeries[3] = colorSeries[7]
# Swap data for lamps at positions 12 and 24 colorSeries[7] = temp_color_3
temp_color_12 = colorSeries[12]
colorSeries[12] = colorSeries[24] # Swap data for lamps at positions 12 and 24
colorSeries[24] = temp_color_12 temp_color_12 = colorSeries[12]
# ===================================================================== colorSeries[12] = colorSeries[24]
colorSeries[24] = temp_color_12
if DEBUG_MODE: # =====================================================================
# Ensure all characteristics are available before writing
print(f"Confirmed DEBUG set to true.") if DEBUG_MODE:
if len(ble_characteristics) != lampAmount: # Ensure all characteristics are available before writing
print(f"Mismatch in lamp amount. Expected {lampAmount}, got {len(ble_characteristics)}.") print(f"Confirmed DEBUG set to true.")
return if len(ble_characteristics) != lampAmount:
print(f"Constructed the following matrix data: {colorSeries}") print(f"Mismatch in lamp amount. Expected {lampAmount}, got {len(ble_characteristics)}.")
# Write each byte string to its corresponding characteristic return
for i, char in enumerate(ble_characteristics): print(f"Constructed the following matrix data: {colorSeries}")
value_to_write = colorSeries[i] # Write each byte string to its corresponding characteristic
print(f"Setting Lamp {i} ({char.uuid}) to {value_to_write.hex()}") for i, char in enumerate(ble_characteristics):
await ble_client.write_gatt_char(char.uuid, value_to_write) value_to_write = colorSeries[i]
else: print(f"Setting Lamp {i} ({char.uuid}) to {value_to_write.hex()}")
print(f"Confirmed DEBUG set to false.") await ble_client.write_gatt_char(char.uuid, value_to_write)
value_to_write = b"".join([color for color in colorSeries]) else:
print(value_to_write) print(f"Confirmed DEBUG set to false.")
print(f"Setting lamps to {value_to_write.hex()}") value_to_write = b"".join([color for color in colorSeries])
await ble_client.write_gatt_char(ble_characteristics[0].uuid, value_to_write) print(value_to_write)
print(f"Setting lamps to {value_to_write.hex()}")
await ble_client.write_gatt_char(ble_characteristics[0].uuid, value_to_write)
async def connect_to_ble_device():
global ble_client
global ble_characteristics async def connect_to_ble_device():
global ble_client
print(f"Scanning for device: {DEVICE_NAME}...") global ble_characteristics
devices = await BleakScanner.discover() global ble_connection_status
target_device = next((d for d in devices if d.name == DEVICE_NAME), None)
print(f"Scanning for device: {DEVICE_NAME}...")
if not target_device: devices = await BleakScanner.discover()
print(f"Device '{DEVICE_NAME}' not found.") target_device = next((d for d in devices if d.name == DEVICE_NAME), None)
return False
if not target_device:
print(f"Found device: {target_device.name} ({target_device.address})") print(f"Device '{DEVICE_NAME}' not found.")
ble_connection_status = False
try: return False
ble_client = BleakClient(target_device.address)
await ble_client.connect() print(f"Found device: {target_device.name} ({target_device.address})")
if ble_client.is_connected:
print(f"Connected to {target_device.name}") try:
services = [service for service in ble_client.services if service.handle != 1] ble_client = BleakClient(target_device.address)
await ble_client.connect()
# The previous logic for filtering services seems incorrect; let's grab all characteristics if ble_client.is_connected:
characteristics = [ print(f"Connected to {target_device.name}")
char for service in services for char in service.characteristics services = [service for service in ble_client.services if service.handle != 1]
]
ble_characteristics = sorted(characteristics, key=lambda char: char.handle) # The previous logic for filtering services seems incorrect; let's grab all characteristics
print(f"Found {len(ble_characteristics)} characteristics for lamps.") characteristics = [
return True char for service in services for char in service.characteristics
else: ]
print(f"Failed to connect to {target_device.name}") ble_characteristics = sorted(characteristics, key=lambda char: char.handle)
return False print(f"Found {len(ble_characteristics)} characteristics for lamps.")
except Exception as e: ble_connection_status = True
print(f"An error occurred during BLE connection: {e}") return True
return False else:
# ================================================================================================= print(f"Failed to connect to {target_device.name}")
# COLOR MIXING ble_connection_status = False
# ================================================================================================= return False
except Exception as e:
def calculate_rgb(ww, cw, blue): print(f"An error occurred during BLE connection: {e}")
""" ble_connection_status = False
Calculates the combined RGB color from warm white, cool white, and blue light values. return False
This function is a Python equivalent of the JavaScript color mixer in index.html. # =================================================================================================
""" # COLOR MIXING
# Define the RGB components for each light source based on slider track colors # =================================================================================================
warm_white_r, warm_white_g, warm_white_b = 255, 192, 128
cool_white_r, cool_white_g, cool_white_b = 192, 224, 255 def calculate_rgb(ww, cw, blue):
blue_r, blue_g, blue_b = 0, 0, 255 """
Calculates the combined RGB color from warm white, cool white, and blue light values.
# Normalize the slider values (0-255) and apply them to the base colors This function is a Python equivalent of the JavaScript color mixer in index.html.
r = (ww / 255) * warm_white_r + (cw / 255) * cool_white_r + (blue / 255) * blue_r """
g = (ww / 255) * warm_white_g + (cw / 255) * cool_white_g + (blue / 255) * blue_g # Define the RGB components for each light source based on slider track colors
b = (ww / 255) * warm_white_b + (cw / 255) * cool_white_b + (blue / 255) * blue_b warm_white_r, warm_white_g, warm_white_b = 255, 192, 128
cool_white_r, cool_white_g, cool_white_b = 192, 224, 255
# Clamp the values to 255 and convert to integer blue_r, blue_g, blue_b = 0, 0, 255
r = int(min(255, round(r)))
g = int(min(255, round(g))) # Normalize the slider values (0-255) and apply them to the base colors
b = int(min(255, round(b))) r = (ww / 255) * warm_white_r + (cw / 255) * cool_white_r + (blue / 255) * blue_r
g = (ww / 255) * warm_white_g + (cw / 255) * cool_white_g + (blue / 255) * blue_g
return r, g, b b = (ww / 255) * warm_white_b + (cw / 255) * cool_white_b + (blue / 255) * blue_b
def rgb_to_hex(r, g, b): # Clamp the values to 255 and convert to integer
""" r = int(min(255, round(r)))
Converts RGB color values to a hex color string. g = int(min(255, round(g)))
""" b = int(min(255, round(b)))
# Ensure values are within the valid range (0-255) and are integers
r = int(max(0, min(255, r))) return r, g, b
g = int(max(0, min(255, g)))
b = int(max(0, min(255, b))) def rgb_to_hex(r, g, b):
"""
# Convert each component to a two-digit hexadecimal string Converts RGB color values to a hex color string.
return f'#{r:02x}{g:02x}{b:02x}' """
# Ensure values are within the valid range (0-255) and are integers
# ================================================================================================= r = int(max(0, min(255, r)))
# FLASK APPLICATION g = int(max(0, min(255, g)))
# ================================================================================================= b = int(max(0, min(255, b)))
app = Flask(__name__) # Convert each component to a two-digit hexadecimal string
return f'#{r:02x}{g:02x}{b:02x}'
# In-memory matrix for DEBUG_MODE
lamp_matrix = [['#000000' for _ in range(5)] for _ in range(5)] # =================================================================================================
# FLASK APPLICATION
@app.route('/') # =================================================================================================
def index():
print(f"Getting current lamp matrix info: {lamp_matrix}") app = Flask(__name__)
if DEBUG_MODE:
return render_template('index.html', matrix=lamp_matrix) # In-memory matrix for DEBUG_MODE
else: lamp_matrix = [['#000000' for _ in range(5)] for _ in range(5)]
# In live mode, we'll pass a default black matrix.
initial_matrix = [['#000000' for _ in range(5)] for _ in range(5)] @app.route('/')
return render_template('index.html', matrix=initial_matrix) def index():
print(f"Getting current lamp matrix info: {lamp_matrix}")
if DEBUG_MODE:
@app.route('/set_matrix', methods=['POST']) return render_template('index.html', matrix=lamp_matrix)
def set_matrix(): else:
data = request.get_json() # In live mode, we'll pass a default black matrix.
full_matrix = data.get('matrix', []) initial_matrix = [['#000000' for _ in range(5)] for _ in range(5)]
matrixDataSerialized = [] return render_template('index.html', matrix=initial_matrix)
if not full_matrix or len(full_matrix) != 5 or len(full_matrix[0]) != 5:
return jsonify(success=False, message="Invalid matrix data received"), 400
else: @app.route('/set_matrix', methods=['POST'])
print(f"Received the following matrix data: {full_matrix}") def set_matrix():
data = request.get_json()
#Creating empty byte array full_matrix = data.get('matrix', [])
serial_colors = [b'\x00\x00\x00'] * lampAmount matrixDataSerialized = []
if not full_matrix or len(full_matrix) != 5 or len(full_matrix[0]) != 5:
try: return jsonify(success=False, message="Invalid matrix data received"), 400
for row in range(5): else:
for col in range(5): print(f"Received the following matrix data: {full_matrix}")
lamp_data = full_matrix[row][col]
ww = int(lamp_data['ww']) #Creating empty byte array
cw = int(lamp_data['cw']) serial_colors = [b'\x00\x00\x00'] * lampAmount
blue = int(lamp_data['blue'])
try:
#Preparing byte data for control command for row in range(5):
color_bytes = bytes([ww, cw, blue]) for col in range(5):
spiral_pos = get_spiral_address(row, col, SPIRAL_MAP_5x5) lamp_data = full_matrix[row][col]
print(f"Constructed data for {spiral_pos}: {color_bytes}") ww = int(lamp_data['ww'])
if spiral_pos != -1: cw = int(lamp_data['cw'])
serial_colors[spiral_pos] = color_bytes blue = int(lamp_data['blue'])
#Preparing hex color data for frontend
lampColorR, lampColorG, lampColorB = calculate_rgb(ww,cw,blue) #Preparing byte data for control command
lamp_matrix[row][col] = rgb_to_hex(lampColorR, lampColorG, lampColorB) color_bytes = bytes([ww, cw, blue])
if DEBUG_MODE: spiral_pos = get_spiral_address(row, col, SPIRAL_MAP_5x5)
# === DEBUG MODE: Update in-memory matrix === print(f"Constructed data for {spiral_pos}: {color_bytes}")
return jsonify(success=True) if spiral_pos != -1:
else: serial_colors[spiral_pos] = color_bytes
# === LIVE MODE: Communicate with the BLE device === #Preparing hex color data for frontend
asyncio.run_coroutine_threadsafe( lampColorR, lampColorG, lampColorB = calculate_rgb(ww,cw,blue)
set_full_matrix_on_ble(serial_colors), lamp_matrix[row][col] = rgb_to_hex(lampColorR, lampColorG, lampColorB)
ble_event_loop if DEBUG_MODE:
) # === DEBUG MODE: Update in-memory matrix ===
return jsonify(success=True) return jsonify(success=True)
except Exception as e: else:
print(f"Error in set_matrix route: {e}") # === LIVE MODE: Communicate with the BLE device ===
return jsonify(success=False, message=str(e)), 500 asyncio.run_coroutine_threadsafe(
set_full_matrix_on_ble(serial_colors),
print(f"Getting current lamp matrix info: {lamp_matrix}") ble_event_loop
)
# ================================================================================================= return jsonify(success=True)
# APP STARTUP except Exception as e:
# ================================================================================================= print(f"Error in set_matrix route: {e}")
return jsonify(success=False, message=str(e)), 500
def signal_handler(signum, frame):
print("Received shutdown signal, gracefully shutting down...") print(f"Getting current lamp matrix info: {lamp_matrix}")
if not DEBUG_MODE and ble_client and ble_client.is_connected:
print("Disconnecting BLE client...") @app.route('/ble_status')
disconnect_future = asyncio.run_coroutine_threadsafe(ble_client.disconnect(), ble_event_loop) def ble_status():
try: global ble_connection_status
# Wait for the disconnect to complete with a timeout if DEBUG_MODE:
disconnect_future.result(timeout=5) return jsonify(connected=True)
print("BLE client disconnected successfully.") return jsonify(connected=ble_connection_status)
except Exception as e:
print(f"Error during BLE disconnect: {e}") @app.route('/vision/pupil_data')
def get_pupil_data():
if not DEBUG_MODE and ble_event_loop and ble_event_loop.is_running(): """
print("Stopping BLE event loop...") Endpoint to get the latest pupil segmentation data from the vision system.
# Schedule a stop and wait for the thread to finish """
ble_event_loop.call_soon_threadsafe(ble_event_loop.stop) if vision_system:
ble_thread.join(timeout=1) data = vision_system.get_pupil_data()
print("BLE event loop stopped.") return jsonify(success=True, data=data)
return jsonify(success=False, message="Vision system not initialized"), 500
os._exit(0)
# =================================================================================================
if __name__ == '__main__': # APP STARTUP
# Register the signal handler before running the app # =================================================================================================
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler) vision_system = None
if not DEBUG_MODE: def signal_handler(signum, frame):
print("Starting BLE event loop in background thread...") print("Received shutdown signal, gracefully shutting down...")
ble_event_loop = asyncio.new_event_loop() global ble_connection_status
ble_thread = threading.Thread(target=ble_event_loop.run_forever, daemon=True)
ble_thread.start() # Stop the vision system
if vision_system:
# Connect to the device as soon as the app starts print("Stopping vision system...")
future = asyncio.run_coroutine_threadsafe(connect_to_ble_device(), ble_event_loop) vision_system.stop()
future.result(timeout=10) # Wait up to 10 seconds for connection print("Vision system stopped.")
app.run(debug=True, use_reloader=False, host="0.0.0.0") if not DEBUG_MODE and ble_client and ble_client.is_connected:
print("Disconnecting BLE client...")
ble_connection_status = False
disconnect_future = asyncio.run_coroutine_threadsafe(ble_client.disconnect(), ble_event_loop)
try:
# Wait for the disconnect to complete with a timeout
disconnect_future.result(timeout=5)
print("BLE client disconnected successfully.")
except Exception as e:
print(f"Error during BLE disconnect: {e}")
if not DEBUG_MODE and ble_event_loop and ble_event_loop.is_running():
print("Stopping BLE event loop...")
# Schedule a stop and wait for the thread to finish
ble_event_loop.call_soon_threadsafe(ble_event_loop.stop)
ble_thread.join(timeout=1)
print("BLE event loop stopped.")
os._exit(0)
if __name__ == '__main__':
# Register the signal handler before running the app
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Initialize and start the Vision System
try:
vision_config = {"camera_id": 0, "model_path": "yolov10.onnx"}
vision_system = VisionSystem(config=vision_config)
vision_system.start()
except Exception as e:
print(f"Failed to initialize or start Vision System: {e}")
vision_system = None
if not DEBUG_MODE:
print("Starting BLE event loop in background thread...")
ble_event_loop = asyncio.new_event_loop()
ble_thread = threading.Thread(target=ble_event_loop.run_forever, daemon=True)
ble_thread.start()
# Connect to the device as soon as the app starts
future = asyncio.run_coroutine_threadsafe(connect_to_ble_device(), ble_event_loop)
future.result(timeout=10) # Wait up to 10 seconds for connection
app.run(debug=True, use_reloader=False, host="0.0.0.0")

View File

View File

@ -0,0 +1,235 @@
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import pyds
import threading
try:
from pypylon import pylon
except ImportError:
print("pypylon is not installed. DeepStreamBackend will not be able to get frames from Basler camera.")
pylon = None
class DeepStreamPipeline:
"""
A class to manage the DeepStream pipeline for pupil segmentation.
"""
def __init__(self, config):
self.config = config
Gst.init(None)
self.pipeline = None
self.loop = GLib.MainLoop()
self.pupil_data = None
self.camera = None
self.frame_feeder_thread = None
self.is_running = False
print("DeepStreamPipeline initialized.")
def _frame_feeder_thread(self, appsrc):
"""
Thread function to feed frames from the Basler camera to the appsrc element.
"""
while self.is_running:
if not self.camera or not self.camera.IsGrabbing():
print("Camera not ready, stopping frame feeder.")
break
try:
grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grab_result.GrabSucceeded():
frame = grab_result.Array
# Create a Gst.Buffer
buf = Gst.Buffer.new_allocate(None, len(frame), None)
buf.fill(0, frame)
# Push the buffer into the appsrc
appsrc.emit('push-buffer', buf)
else:
print(f"Error grabbing frame: {grab_result.ErrorCode}")
except Exception as e:
print(f"An error occurred in frame feeder thread: {e}")
break
finally:
if 'grab_result' in locals() and grab_result:
grab_result.Release()
def bus_call(self, bus, message, loop):
"""
Callback function for handling messages from the GStreamer bus.
"""
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream\n")
self.is_running = False
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write("Error: %s: %s\n" % (err, debug))
self.is_running = False
loop.quit()
return True
def pgie_sink_pad_buffer_probe(self, pad, info, u_data):
"""
Probe callback function for the sink pad of the pgie element.
"""
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return Gst.PadProbeReturn.OK
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the address of gst_buffer as input, which is a ptr.
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
frame_meta = pyds.glist_get_data(l_frame)
except StopIteration:
break
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta = pyds.glist_get_data(l_obj)
except StopIteration:
break
# Access and process object metadata
rect_params = obj_meta.rect_params
top = rect_params.top
left = rect_params.left
width = rect_params.width
height = rect_params.height
self.pupil_data = {
"bounding_box": [left, top, left + width, top + height],
"confidence": obj_meta.confidence
}
print(f"Pupil detected: {self.pupil_data}")
try:
l_obj = l_obj.next
except StopIteration:
break
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def start(self):
"""
Builds and starts the DeepStream pipeline.
"""
if not pylon:
raise ImportError("pypylon is not installed. Cannot start DeepStreamPipeline with Basler camera.")
# Initialize camera
try:
self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
self.camera.Open()
self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
print("DeepStreamPipeline: Basler camera opened and started grabbing.")
except Exception as e:
print(f"DeepStreamPipeline: Error opening Basler camera: {e}")
return
self.pipeline = Gst.Pipeline()
if not self.pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
return
source = Gst.ElementFactory.make("appsrc", "app-source")
# ... (element creation remains the same)
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
sink = Gst.ElementFactory.make("fakesink", "sink")
videoconvert = Gst.ElementFactory.make("nvvideoconvert", "nv-videoconvert")
# Set appsrc properties
# TODO: Set caps based on camera properties
caps = Gst.Caps.from_string("video/x-raw,format=GRAY8,width=1280,height=720,framerate=30/1")
source.set_property("caps", caps)
source.set_property("format", "time")
pgie.set_property('config-file-path', "pgie_yolov10_config.txt")
self.pipeline.add(source)
self.pipeline.add(videoconvert)
self.pipeline.add(pgie)
self.pipeline.add(sink)
if not source.link(videoconvert):
sys.stderr.write(" Unable to link source to videoconvert \n")
return
if not videoconvert.link(pgie):
sys.stderr.write(" Unable to link videoconvert to pgie \n")
return
if not pgie.link(sink):
sys.stderr.write(" Unable to link pgie to sink \n")
return
pgie_sink_pad = pgie.get_static_pad("sink")
if not pgie_sink_pad:
sys.stderr.write(" Unable to get sink pad of pgie \n")
return
pgie_sink_pad.add_probe(Gst.PadProbeType.BUFFER, self.pgie_sink_pad_buffer_probe, 0)
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self.bus_call, self.loop)
self.is_running = True
self.frame_feeder_thread = threading.Thread(target=self._frame_feeder_thread, args=(source,))
self.frame_feeder_thread.start()
print("Starting pipeline...")
self.pipeline.set_state(Gst.State.PLAYING)
print("DeepStreamPipeline started.")
def stop(self):
"""
Stops the DeepStream pipeline.
"""
self.is_running = False
if self.frame_feeder_thread:
self.frame_feeder_thread.join()
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
print("DeepStreamPipeline stopped.")
if self.camera and self.camera.IsGrabbing():
self.camera.StopGrabbing()
if self.camera and self.camera.IsOpen():
self.camera.Close()
print("DeepStreamPipeline: Basler camera closed.")
def get_data(self):
"""
Retrieves data from the pipeline.
"""
return self.pupil_data
if __name__ == '__main__':
config = {}
pipeline = DeepStreamPipeline(config)
pipeline.start()
# Run the GLib main loop in the main thread
try:
pipeline.loop.run()
except KeyboardInterrupt:
print("Interrupted by user.")
pipeline.stop()

View File

View File

View File

@ -0,0 +1,18 @@
[property]
gpu-id=0
net-scale-factor=0.00392156862745098
#onnx-file=yolov10.onnx
model-engine-file=model.engine
#labelfile-path=labels.txt
batch-size=1
process-mode=1
model-color-format=0
network-mode=0
num-detected-classes=1
gie-unique-id=1
output-blob-names=output0
[class-attrs-all]
pre-cluster-threshold=0.2
eps=0.2
group-threshold=1

View File

@ -0,0 +1,285 @@
// State for the entire 5x5 matrix, storing {ww, cw, blue} for each lamp
var lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0}));
var selectedLamps = [];
// Function to calculate a visual RGB color from the three light values using a proper additive model
function calculateRgb(ww, cw, blue) {
// Define the RGB components for each light source based on slider track colors
const warmWhiteR = 255;
const warmWhiteG = 192;
const warmWhiteB = 128;
const coolWhiteR = 192;
const coolWhiteG = 224;
const coolWhiteB = 255;
const blueR = 0;
const blueG = 0;
const blueB = 255;
// Normalize the slider values (0-255) and apply them to the base colors
var r = (ww / 255) * warmWhiteR + (cw / 255) * coolWhiteR + (blue / 255) * blueR;
var g = (ww / 255) * warmWhiteG + (cw / 255) * coolWhiteG + (blue / 255) * blueG;
var b = (ww / 255) * warmWhiteB + (cw / 255) * coolWhiteB + (blue / 255) * blueB;
// Clamp the values to 255 and convert to integer
r = Math.min(255, Math.round(r));
g = Math.min(255, Math.round(g));
b = Math.min(255, Math.round(b));
// Convert to hex string
var toHex = (c) => ('0' + c.toString(16)).slice(-2);
return '#' + toHex(r) + toHex(g) + toHex(b);
}
function updateLampUI(lamp, colorState) {
var newColor = calculateRgb(colorState.ww, colorState.cw, colorState.blue);
var lampElement = $(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`);
lampElement.css('background-color', newColor);
if (newColor === '#000000') {
lampElement.removeClass('on');
lampElement.css('box-shadow', `inset 0 0 5px rgba(0,0,0,0.5)`);
} else {
lampElement.addClass('on');
lampElement.css('box-shadow', `0 0 15px ${newColor}, 0 0 25px ${newColor}`);
}
}
// Function to update the UI and send the full matrix state to the backend
function sendFullMatrixUpdate(lampsToUpdate, isRegionUpdate = false) {
var fullMatrixData = lampMatrixState.map(row => row.map(lamp => ({
ww: lamp.ww,
cw: lamp.cw,
blue: lamp.blue
})));
$.ajax({
url: '/set_matrix',
type: 'POST',
contentType: 'application/json',
data: JSON.stringify({ matrix: fullMatrixData }),
success: function(response) {
if (response.success) {
if (isRegionUpdate) {
// On a region button click, update the entire matrix UI
for (var r = 0; r < 5; r++) {
for (var c = 0; c < 5; c++) {
updateLampUI({row: r, col: c}, lampMatrixState[r][c]);
}
}
} else {
// Otherwise, just update the lamps that changed
lampsToUpdate.forEach(function(lamp) {
updateLampUI(lamp, lampMatrixState[lamp.row][lamp.col]);
});
}
}
}
});
}
function updateSliders(ww, cw, blue, prefix = '') {
$(`#${prefix}ww-slider`).val(ww);
$(`#${prefix}cw-slider`).val(cw);
$(`#${prefix}blue-slider`).val(blue);
$(`#${prefix}ww-number`).val(ww);
$(`#${prefix}cw-number`).val(cw);
$(`#${prefix}blue-number`).val(blue);
}
$(document).ready(function() {
var regionMaps = {
'Upper': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
],
'Lower': [
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'Left': [
{row: 0, col: 0}, {row: 1, col: 0}, {row: 2, col: 0}, {row: 3, col: 0}, {row: 4, col: 0},
{row: 0, col: 1}, {row: 1, col: 1}, {row: 2, col: 1}, {row: 3, col: 1}, {row: 4, col: 1},
],
'Right': [
{row: 0, col: 3}, {row: 1, col: 3}, {row: 2, col: 3}, {row: 3, col: 3}, {row: 4, col: 3},
{row: 0, col: 4}, {row: 1, col: 4}, {row: 2, col: 4}, {row: 3, col: 4}, {row: 4, col: 4},
],
'Inner ring': [
{row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3},
{row: 2, col: 1}, {row: 2, col: 3},
{row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}
],
'Outer ring': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'All': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 1}, {row: 2, col: 3}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
]
};
// Exclude the center lamp from the 'All' region
var allRegionWithoutCenter = regionMaps['All'].filter(lamp => !(lamp.row === 2 && lamp.col === 2));
regionMaps['All'] = allRegionWithoutCenter;
// Initialize lampMatrixState from the initial HTML colors
$('.lamp').each(function() {
var row = $(this).data('row');
var col = $(this).data('col');
var color = $(this).css('background-color');
var rgb = color.match(/\d+/g);
lampMatrixState[row][col] = {
ww: rgb[0], cw: rgb[1], blue: rgb[2]
};
});
$('#region-select').on('change', function() {
var region = $(this).val();
// Toggle the inactive state of the control panel based on selection
if (region) {
$('.control-panel').removeClass('inactive-control');
} else {
$('.control-panel').addClass('inactive-control');
}
var newlySelectedLamps = regionMaps[region];
// Clear selected class from all lamps
$('.lamp').removeClass('selected');
// Get the current slider values to use as the new default
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
// Reset all lamps except the center to black in our state
var lampsToUpdate = [];
var centerLampState = lampMatrixState[2][2];
lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0}));
lampMatrixState[2][2] = centerLampState; // Preserve center lamp state
// Set newly selected lamps to the current slider values
selectedLamps = newlySelectedLamps;
selectedLamps.forEach(function(lamp) {
$(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`).addClass('selected');
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
});
if (selectedLamps.length > 0) {
// Update sliders to reflect the state of the first selected lamp
var firstLamp = selectedLamps[0];
var firstLampState = lampMatrixState[firstLamp.row][firstLamp.col];
updateSliders(firstLampState.ww, firstLampState.cw, firstLampState.blue, '');
}
// Send the full matrix state
sendFullMatrixUpdate(lampsToUpdate, true);
});
// Event listener for the region sliders and number inputs
$('.region-slider-group input').on('input', function() {
if (selectedLamps.length === 0) return;
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
// Clamp value
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
var lampsToUpdate = [];
selectedLamps.forEach(function(lamp) {
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
lampsToUpdate.push(lamp);
});
sendFullMatrixUpdate(lampsToUpdate);
});
// Event listener for the center lamp sliders and number inputs
$('.center-slider-group input').on('input', function() {
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
// Clamp value
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#center-ww-slider').val());
var cw = parseInt($('#center-cw-slider').val());
var blue = parseInt($('#center-blue-slider').val());
var centerLamp = {row: 2, col: 2};
lampMatrixState[centerLamp.row][centerLamp.col] = {ww: ww, cw: cw, blue: blue};
sendFullMatrixUpdate([centerLamp]);
});
// Initial check to set the inactive state
if (!$('#region-select').val()) {
$('.control-panel').addClass('inactive-control');
}
function checkBleStatus() {
$.ajax({
url: '/ble_status',
type: 'GET',
success: function(response) {
var statusElement = $('#ble-status');
if (response.connected) {
statusElement.text('BLE Connected');
statusElement.css('color', 'lightgreen');
} else {
statusElement.text('BLE Disconnected');
statusElement.css('color', 'red');
}
},
error: function() {
var statusElement = $('#ble-status');
statusElement.text('Reconnecting...');
statusElement.css('color', 'orange');
}
});
}
setInterval(checkBleStatus, 2000);
checkBleStatus(); // Initial check
});

View File

@ -1,151 +1,162 @@
:root { :root {
--matrix-width: calc(5 * 70px + 4 * 20px); --matrix-width: calc(5 * 70px + 4 * 20px);
} }
body { body {
font-family: Arial, sans-serif; font-family: Arial, sans-serif;
display: flex; display: flex;
flex-direction: column; flex-direction: column;
align-items: center; align-items: center;
margin: 0; margin: 0;
background-color: #f0f0f0; background-color: #f0f0f0;
min-height: 100vh; min-height: 100vh;
} }
.container { .container {
display: flex; display: flex;
flex-direction: column; flex-direction: column;
align-items: center; align-items: center;
position: relative; position: relative;
} }
.main-content { .main-content {
display: flex; display: flex;
flex-direction: row; flex-direction: row;
align-items: flex-start; align-items: flex-start;
gap: 40px; gap: 40px;
} }
.matrix-grid { .matrix-grid {
display: grid; display: grid;
grid-template-columns: repeat(5, 70px); grid-template-columns: repeat(5, 70px);
grid-template-rows: repeat(5, 70px); grid-template-rows: repeat(5, 70px);
gap: 20px; gap: 20px;
padding: 20px; padding: 20px;
background-color: #333; background-color: #333;
border-radius: 10px; border-radius: 10px;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.2); box-shadow: 0 4px 8px rgba(0, 0, 0, 0.2);
margin-bottom: 20px; margin-bottom: 20px;
} }
.lamp { .lamp {
width: 70px; width: 70px;
height: 70px; height: 70px;
border-radius: 10%; border-radius: 10%;
background-color: #000; background-color: #000;
transition: box-shadow 0.2s, transform 0.1s; transition: box-shadow 0.2s, transform 0.1s;
cursor: pointer; cursor: pointer;
border: 2px solid transparent; border: 2px solid transparent;
} }
.lamp.on { .lamp.on {
box-shadow: 0 0 15px currentColor, 0 0 25px currentColor; box-shadow: 0 0 15px currentColor, 0 0 25px currentColor;
} }
.lamp.selected { .lamp.selected {
border: 2px solid #fff; border: 2px solid #fff;
transform: scale(1.1); transform: scale(1.1);
} }
h1 { h1 {
color: #333; color: #333;
margin-bottom: 20px; margin-bottom: 20px;
} }
.region-control { .region-control {
margin-bottom: 20px; margin-bottom: 20px;
text-align: center; text-align: center;
} }
.region-control select { .region-control select {
padding: 10px 15px; padding: 10px 15px;
font-size: 14px; font-size: 14px;
cursor: pointer; cursor: pointer;
border: 1px solid #ccc; border: 1px solid #ccc;
border-radius: 5px; border-radius: 5px;
background-color: #fff; background-color: #fff;
width: 200px; width: 200px;
} }
.control-panel, .center-lamp-control { .control-panel, .center-lamp-control {
background-color: #444; background-color: #444;
padding: 20px; padding: 20px;
border-radius: 10px; border-radius: 10px;
width: var(--matrix-width); /* Fixed width for consistency */ width: var(--matrix-width); /* Fixed width for consistency */
max-width: var(--matrix-width); max-width: var(--matrix-width);
margin-bottom: 20px; margin-bottom: 20px;
} }
.control-panel.inactive-control { .control-panel.inactive-control {
background-color: #333; background-color: #333;
filter: saturate(0.2); filter: saturate(0.2);
} }
.control-panel.inactive-control .slider-row { .control-panel.inactive-control .slider-row {
pointer-events: none; pointer-events: none;
} }
.control-panel h2, .center-lamp-control h2 { .control-panel h2, .center-lamp-control h2 {
color: #fff; color: #fff;
font-size: 16px; font-size: 16px;
margin-bottom: 10px; margin-bottom: 10px;
text-align: center; text-align: center;
} }
.slider-group { .slider-group {
width: 100%; width: 100%;
display: flex; display: flex;
flex-direction: column; flex-direction: column;
gap: 5px; gap: 5px;
} }
.slider-row { .slider-row {
display: grid; display: grid;
grid-template-columns: 150px 1fr 50px; grid-template-columns: 150px 1fr 50px;
gap: 10px; gap: 10px;
align-items: center; align-items: center;
} }
.slider-group input[type="range"] { .slider-group input[type="range"] {
-webkit-appearance: none; -webkit-appearance: none;
height: 8px; height: 8px;
border-radius: 5px; border-radius: 5px;
outline: none; outline: none;
cursor: pointer; cursor: pointer;
} }
.slider-group input[type="number"] { .slider-group input[type="number"] {
width: 100%; width: 100%;
font-size: 14px; font-size: 14px;
text-align: center; text-align: center;
border: none; border: none;
border-radius: 5px; border-radius: 5px;
padding: 5px; padding: 5px;
} }
.slider-group input[type="range"]::-webkit-slider-thumb { .slider-group input[type="range"]::-webkit-slider-thumb {
-webkit-appearance: none; -webkit-appearance: none;
height: 20px; height: 20px;
width: 20px; width: 20px;
border-radius: 50%; border-radius: 50%;
background: #fff; background: #fff;
cursor: pointer; cursor: pointer;
box-shadow: 0 0 5px rgba(0,0,0,0.5); box-shadow: 0 0 5px rgba(0,0,0,0.5);
margin-top: 2px; margin-top: 2px;
} }
.slider-group input[type="range"]::-webkit-slider-runnable-track { .slider-group input[type="range"]::-webkit-slider-runnable-track {
height: 24px; height: 24px;
border-radius: 12px; border-radius: 12px;
} }
input.white-3000k::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #ffc080); } input.white-3000k::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #ffc080); }
input.white-6500k::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #c0e0ff); } input.white-6500k::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #c0e0ff); }
input.blue::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #00f); } input.blue::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #00f); }
.slider-label { .slider-label {
color: #fff; color: #fff;
font-size: 14px; font-size: 14px;
text-align: left; text-align: left;
white-space: nowrap; white-space: nowrap;
width: 120px; width: 120px;
} }
.inactive-control .slider-label { .inactive-control .slider-label {
color: #888; color: #888;
} }
@media (max-width: 1000px) { @media (max-width: 1000px) {
.main-content { .main-content {
flex-direction: column; flex-direction: column;
align-items: center; align-items: center;
} }
}
#ble-status {
position: fixed;
top: 10px;
right: 10px;
font-size: 16px;
color: #fff;
background-color: #333;
padding: 5px 10px;
border-radius: 5px;
} }

View File

@ -1,342 +1,82 @@
<!DOCTYPE html> <!DOCTYPE html>
<html> <html>
<head> <head>
<title>Lamp Matrix Control</title> <title>Lamp Matrix Control</title>
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}"> <link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script> <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script> <script src="{{ url_for('static', filename='script.js') }}"></script>
// State for the entire 5x5 matrix, storing {ww, cw, blue} for each lamp </head>
var lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0})); <body>
var selectedLamps = []; <div class="container">
<div id="ble-status"></div>
// Function to calculate a visual RGB color from the three light values using a proper additive model <h1>Lamp Matrix Control</h1>
function calculateRgb(ww, cw, blue) { <div class="region-control">
// Define the RGB components for each light source based on slider track colors <label for="region-select">Select Region:</label>
const warmWhiteR = 255; <select id="region-select">
const warmWhiteG = 192; <option value="" disabled selected>-- Select a region --</option>
const warmWhiteB = 128; <option value="Upper">Upper</option>
<option value="Lower">Lower</option>
const coolWhiteR = 192; <option value="Left">Left</option>
const coolWhiteG = 224; <option value="Right">Right</option>
const coolWhiteB = 255; <option value="Inner ring">Inner ring</option>
<option value="Outer ring">Outer ring</option>
const blueR = 0; <option value="All">All</option>
const blueG = 0; </select>
const blueB = 255; </div>
// Normalize the slider values (0-255) and apply them to the base colors <div class="main-content">
var r = (ww / 255) * warmWhiteR + (cw / 255) * coolWhiteR + (blue / 255) * blueR; <div class="matrix-grid">
var g = (ww / 255) * warmWhiteG + (cw / 255) * coolWhiteG + (blue / 255) * blueG; {% for row in range(5) %}
var b = (ww / 255) * warmWhiteB + (cw / 255) * coolWhiteB + (blue / 255) * blueB; {% for col in range(5) %}
<div class="lamp" data-row="{{ row }}" data-col="{{ col }}" style="background-color: {{ matrix[row][col] }}; box-shadow: {{ '0 0 15px ' + matrix[row][col] + ', 0 0 25px ' + matrix[row][col] if matrix[row][col] != '#000000' else 'inset 0 0 5px rgba(0,0,0,0.5)' }}"></div>
// Clamp the values to 255 and convert to integer {% endfor %}
r = Math.min(255, Math.round(r)); {% endfor %}
g = Math.min(255, Math.round(g)); </div>
b = Math.min(255, Math.round(b));
<div class="slider-controls">
// Convert to hex string <div class="center-lamp-control">
var toHex = (c) => ('0' + c.toString(16)).slice(-2); <h2>Center Lamp</h2>
return '#' + toHex(r) + toHex(g) + toHex(b); <div class="slider-group center-slider-group">
} <div class="slider-row">
<span class="slider-label">Warm White (3000K)</span>
function updateLampUI(lamp, colorState) { <input type="range" id="center-ww-slider" min="0" max="255" value="0" class="white-3000k">
var newColor = calculateRgb(colorState.ww, colorState.cw, colorState.blue); <input type="number" id="center-ww-number" min="0" max="255" value="0">
var lampElement = $(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`); </div>
lampElement.css('background-color', newColor); <div class="slider-row">
if (newColor === '#000000') { <span class="slider-label">Cool White (6500K)</span>
lampElement.removeClass('on'); <input type="range" id="center-cw-slider" min="0" max="255" value="0" class="white-6500k">
lampElement.css('box-shadow', `inset 0 0 5px rgba(0,0,0,0.5)`); <input type="number" id="center-cw-number" min="0" max="255" value="0">
} else { </div>
lampElement.addClass('on'); <div class="slider-row">
lampElement.css('box-shadow', `0 0 15px ${newColor}, 0 0 25px ${newColor}`); <span class="slider-label">Blue</span>
} <input type="range" id="center-blue-slider" min="0" max="255" value="0" class="blue">
} <input type="number" id="center-blue-number" min="0" max="255" value="0">
</div>
// Function to update the UI and send the full matrix state to the backend </div>
function sendFullMatrixUpdate(lampsToUpdate, isRegionUpdate = false) { </div>
var fullMatrixData = lampMatrixState.map(row => row.map(lamp => ({
ww: lamp.ww, <div class="control-panel">
cw: lamp.cw, <h2>Selected Region</h2>
blue: lamp.blue <div class="slider-group region-slider-group">
}))); <div class="slider-row">
<span class="slider-label">Warm White (3000K)</span>
$.ajax({ <input type="range" id="ww-slider" min="0" max="255" value="0" class="white-3000k">
url: '/set_matrix', <input type="number" id="ww-number" min="0" max="255" value="0">
type: 'POST', </div>
contentType: 'application/json', <div class="slider-row">
data: JSON.stringify({ matrix: fullMatrixData }), <span class="slider-label">Cool White (6500K)</span>
success: function(response) { <input type="range" id="cw-slider" min="0" max="255" value="0" class="white-6500k">
if (response.success) { <input type="number" id="cw-number" min="0" max="255" value="0">
if (isRegionUpdate) { </div>
// On a region button click, update the entire matrix UI <div class="slider-row">
for (var r = 0; r < 5; r++) { <span class="slider-label">Blue</span>
for (var c = 0; c < 5; c++) { <input type="range" id="blue-slider" min="0" max="255" value="0" class="blue">
updateLampUI({row: r, col: c}, lampMatrixState[r][c]); <input type="number" id="blue-number" min="0" max="255" value="0">
} </div>
} </div>
} else { </div>
// Otherwise, just update the lamps that changed </div>
lampsToUpdate.forEach(function(lamp) { </div>
updateLampUI(lamp, lampMatrixState[lamp.row][lamp.col]); </div>
}); </body>
}
}
}
});
}
function updateSliders(ww, cw, blue, prefix = '') {
$(`#${prefix}ww-slider`).val(ww);
$(`#${prefix}cw-slider`).val(cw);
$(`#${prefix}blue-slider`).val(blue);
$(`#${prefix}ww-number`).val(ww);
$(`#${prefix}cw-number`).val(cw);
$(`#${prefix}blue-number`).val(blue);
}
$(document).ready(function() {
var regionMaps = {
'Upper': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
],
'Lower': [
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'Left': [
{row: 0, col: 0}, {row: 1, col: 0}, {row: 2, col: 0}, {row: 3, col: 0}, {row: 4, col: 0},
{row: 0, col: 1}, {row: 1, col: 1}, {row: 2, col: 1}, {row: 3, col: 1}, {row: 4, col: 1},
],
'Right': [
{row: 0, col: 3}, {row: 1, col: 3}, {row: 2, col: 3}, {row: 3, col: 3}, {row: 4, col: 3},
{row: 0, col: 4}, {row: 1, col: 4}, {row: 2, col: 4}, {row: 3, col: 4}, {row: 4, col: 4},
],
'Inner ring': [
{row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3},
{row: 2, col: 1}, {row: 2, col: 3},
{row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}
],
'Outer ring': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'All': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 1}, {row: 2, col: 3}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
]
};
// Exclude the center lamp from the 'All' region
var allRegionWithoutCenter = regionMaps['All'].filter(lamp => !(lamp.row === 2 && lamp.col === 2));
regionMaps['All'] = allRegionWithoutCenter;
// Initialize lampMatrixState from the initial HTML colors
$('.lamp').each(function() {
var row = $(this).data('row');
var col = $(this).data('col');
var color = $(this).css('background-color');
var rgb = color.match(/\d+/g);
lampMatrixState[row][col] = {
ww: rgb[0], cw: rgb[1], blue: rgb[2]
};
});
$('#region-select').on('change', function() {
var region = $(this).val();
// Toggle the inactive state of the control panel based on selection
if (region) {
$('.control-panel').removeClass('inactive-control');
} else {
$('.control-panel').addClass('inactive-control');
}
var newlySelectedLamps = regionMaps[region];
// Clear selected class from all lamps
$('.lamp').removeClass('selected');
// Get the current slider values to use as the new default
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
// Reset all lamps except the center to black in our state
var lampsToUpdate = [];
var centerLampState = lampMatrixState[2][2];
lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0}));
lampMatrixState[2][2] = centerLampState; // Preserve center lamp state
// Set newly selected lamps to the current slider values
selectedLamps = newlySelectedLamps;
selectedLamps.forEach(function(lamp) {
$(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`).addClass('selected');
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
});
if (selectedLamps.length > 0) {
// Update sliders to reflect the state of the first selected lamp
var firstLamp = selectedLamps[0];
var firstLampState = lampMatrixState[firstLamp.row][firstLamp.col];
updateSliders(firstLampState.ww, firstLampState.cw, firstLampState.blue, '');
}
// Send the full matrix state
sendFullMatrixUpdate(lampsToUpdate, true);
});
// Event listener for the region sliders and number inputs
$('.region-slider-group input').on('input', function() {
if (selectedLamps.length === 0) return;
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
// Clamp value
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
var lampsToUpdate = [];
selectedLamps.forEach(function(lamp) {
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
lampsToUpdate.push(lamp);
});
sendFullMatrixUpdate(lampsToUpdate);
});
// Event listener for the center lamp sliders and number inputs
$('.center-slider-group input').on('input', function() {
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
// Clamp value
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#center-ww-slider').val());
var cw = parseInt($('#center-cw-slider').val());
var blue = parseInt($('#center-blue-slider').val());
var centerLamp = {row: 2, col: 2};
lampMatrixState[centerLamp.row][centerLamp.col] = {ww: ww, cw: cw, blue: blue};
sendFullMatrixUpdate([centerLamp]);
});
// Initial check to set the inactive state
if (!$('#region-select').val()) {
$('.control-panel').addClass('inactive-control');
}
});
</script>
</head>
<body>
<div class="container">
<h1>Lamp Matrix Control</h1>
<div class="region-control">
<label for="region-select">Select Region:</label>
<select id="region-select">
<option value="" disabled selected>-- Select a region --</option>
<option value="Upper">Upper</option>
<option value="Lower">Lower</option>
<option value="Left">Left</option>
<option value="Right">Right</option>
<option value="Inner ring">Inner ring</option>
<option value="Outer ring">Outer ring</option>
<option value="All">All</option>
</select>
</div>
<div class="main-content">
<div class="matrix-grid">
{% for row in range(5) %}
{% for col in range(5) %}
<div class="lamp" data-row="{{ row }}" data-col="{{ col }}" style="background-color: {{ matrix[row][col] }}; box-shadow: {{ '0 0 15px ' + matrix[row][col] + ', 0 0 25px ' + matrix[row][col] if matrix[row][col] != '#000000' else 'inset 0 0 5px rgba(0,0,0,0.5)' }}"></div>
{% endfor %}
{% endfor %}
</div>
<div class="slider-controls">
<div class="center-lamp-control">
<h2>Center Lamp</h2>
<div class="slider-group center-slider-group">
<div class="slider-row">
<span class="slider-label">Warm White (3000K)</span>
<input type="range" id="center-ww-slider" min="0" max="255" value="0" class="white-3000k">
<input type="number" id="center-ww-number" min="0" max="255" value="0">
</div>
<div class="slider-row">
<span class="slider-label">Cool White (6500K)</span>
<input type="range" id="center-cw-slider" min="0" max="255" value="0" class="white-6500k">
<input type="number" id="center-cw-number" min="0" max="255" value="0">
</div>
<div class="slider-row">
<span class="slider-label">Blue</span>
<input type="range" id="center-blue-slider" min="0" max="255" value="0" class="blue">
<input type="number" id="center-blue-number" min="0" max="255" value="0">
</div>
</div>
</div>
<div class="control-panel">
<h2>Selected Region</h2>
<div class="slider-group region-slider-group">
<div class="slider-row">
<span class="slider-label">Warm White (3000K)</span>
<input type="range" id="ww-slider" min="0" max="255" value="0" class="white-3000k">
<input type="number" id="ww-number" min="0" max="255" value="0">
</div>
<div class="slider-row">
<span class="slider-label">Cool White (6500K)</span>
<input type="range" id="cw-slider" min="0" max="255" value="0" class="white-6500k">
<input type="number" id="cw-number" min="0" max="255" value="0">
</div>
<div class="slider-row">
<span class="slider-label">Blue</span>
<input type="range" id="blue-slider" min="0" max="255" value="0" class="blue">
<input type="number" id="blue-number" min="0" max="255" value="0">
</div>
</div>
</div>
</div>
</div>
</div>
</body>
</html> </html>

View File

@ -0,0 +1,287 @@
import sys
import platform
import os
class VisionSystem:
"""
The main class for the vision system, responsible for pupil segmentation.
It uses a platform-specific backend for the actual implementation.
"""
def __init__(self, config):
self.config = config
self._backend = self._initialize_backend()
def _initialize_backend(self):
"""
Initializes the appropriate backend based on the environment and OS.
"""
# If in a test environment, use the MockBackend
if os.environ.get("PUPILOMETER_ENV") == "test":
print("Initializing Mock backend for testing...")
return MockBackend(self.config)
os_name = platform.system()
if os_name == "Linux" or os_name == "Windows":
# On Jetson (Linux) or Windows, try to use the DeepStream backend
print("Initializing DeepStream backend...")
try:
return DeepStreamBackend(self.config)
except ImportError as e:
print(f"Could not initialize DeepStreamBackend: {e}")
raise e
elif os_name == "Darwin":
# On macOS, use the Python-based backend
print("Initializing Python backend for macOS...")
try:
return PythonBackend(self.config)
except ImportError as e:
print(f"Could not initialize PythonBackend: {e}")
raise e
else:
raise NotImplementedError(f"Unsupported operating system: {os_name}")
def start(self):
"""
Starts the vision system.
"""
self._backend.start()
def stop(self):
"""
Stops the vision system.
"""
self._backend.stop()
def get_pupil_data(self):
"""
Returns the latest pupil segmentation data.
"""
return self._backend.get_pupil_data()
class MockBackend:
"""
A mock backend for testing purposes.
"""
def __init__(self, config):
self.config = config
print("MockBackend initialized.")
def start(self):
print("MockBackend started.")
pass
def stop(self):
print("MockBackend stopped.")
pass
def get_pupil_data(self):
print("Getting pupil data from MockBackend.")
return {
"pupil_position": (123, 456),
"pupil_diameter": 789,
"info": "mock_data"
}
class DeepStreamBackend:
"""
A class to handle pupil segmentation on Jetson/Windows using DeepStream.
"""
def __init__(self, config):
"""
Initializes the DeepStreamBackend.
Args:
config (dict): A dictionary containing configuration parameters.
"""
from deepstream_pipeline import DeepStreamPipeline
self.config = config
self.pipeline = DeepStreamPipeline(config)
print("DeepStreamBackend initialized.")
def start(self):
"""
Starts the DeepStream pipeline.
"""
self.pipeline.start()
print("DeepStreamBackend started.")
def stop(self):
"""
Stops the DeepStream pipeline.
"""
self.pipeline.stop()
print("DeepStreamBackend stopped.")
def get_pupil_data(self):
"""
Retrieves pupil data from the DeepStream pipeline.
"""
return self.pipeline.get_data()
class PythonBackend:
"""
A class to handle pupil segmentation on macOS using pypylon and ONNX Runtime.
"""
def __init__(self, config):
"""
Initializes the PythonBackend.
Args:
config (dict): A dictionary containing configuration parameters
such as 'model_path'.
"""
self.config = config
self.camera = None
self.inference_session = None
print("PythonBackend initialized.")
def start(self):
"""
Initializes the Basler camera and loads the ONNX model.
"""
try:
from pypylon import pylon
except ImportError:
raise ImportError("pypylon is not installed. Cannot start PythonBackend.")
try:
import onnxruntime
except ImportError:
raise ImportError("onnxruntime is not installed. Cannot start PythonBackend.")
try:
# Initialize the camera
self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
self.camera.Open()
# Start grabbing continuously
self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
print("PythonBackend: Basler camera opened and started grabbing.")
except Exception as e:
print(f"PythonBackend: Error opening Basler camera: {e}")
self.camera = None
try:
# Load the ONNX model
self.inference_session = onnxruntime.InferenceSession(self.config['model_path'])
print(f"PythonBackend: ONNX model loaded from {self.config['model_path']}.")
except Exception as e:
print(f"PythonBackend: Error loading ONNX model: {e}")
self.inference_session = None
print("PythonBackend started.")
def stop(self):
"""
Releases the camera resources.
"""
if self.camera and self.camera.IsGrabbing():
self.camera.StopGrabbing()
print("PythonBackend: Basler camera stopped grabbing.")
if self.camera and self.camera.IsOpen():
self.camera.Close()
print("PythonBackend: Basler camera closed.")
print("PythonBackend stopped.")
def _postprocess_output(self, outputs, original_image_shape):
"""
Post-processes the raw output from the YOLOv10 model.
Args:
outputs (list): A list of numpy arrays representing the model's output.
original_image_shape (tuple): The shape of the original image (height, width).
Returns:
dict: A dictionary containing the processed pupil data.
"""
# TODO: Implement the actual post-processing logic.
# This will involve non-maximum suppression (NMS) and parsing the
# bounding boxes and segmentation masks.
print("Post-processing model output...")
pupil_data = {
"raw_model_output_shape": [o.shape for o in outputs],
"pupil_position": (100, 120), # Placeholder
"pupil_diameter": 30, # Placeholder
"bounding_box": [50, 70, 150, 170] # Placeholder [x1, y1, x2, y2]
}
return pupil_data
def get_pupil_data(self):
"""
Grabs a frame from the camera, runs inference, and returns pupil data.
"""
if not self.camera or not self.camera.IsGrabbing():
print("PythonBackend: Camera not ready.")
return None
if not self.inference_session:
print("PythonBackend: Inference session not ready.")
return None
grab_result = None
try:
import cv2
import numpy as np
from pypylon import pylon
grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grab_result.GrabSucceeded():
image = grab_result.Array
original_shape = image.shape
# Image preprocessing
if len(image.shape) == 2:
image = cv2.cvtColor(image, cv2.COLOR_BAYER_BG2RGB)
input_shape = (640, 640)
resized_image = cv2.resize(image, input_shape)
normalized_image = resized_image.astype(np.float32) / 255.0
transposed_image = np.transpose(normalized_image, (2, 0, 1))
input_tensor = np.expand_dims(transposed_image, axis=0)
# Run inference
input_name = self.inference_session.get_inputs()[0].name
output_names = [o.name for o in self.inference_session.get_outputs()]
outputs = self.inference_session.run(output_names, {input_name: input_tensor})
# Post-process the output
pupil_data = self._postprocess_output(outputs, original_shape)
return pupil_data
else:
print(f"PythonBackend: Error grabbing frame: {grab_result.ErrorCode} {grab_result.ErrorDescription}")
return None
except Exception as e:
print(f"PythonBackend: An error occurred during frame grabbing or inference: {e}")
return None
finally:
if grab_result:
grab_result.Release()
if __name__ == '__main__':
# Example usage
config = {"camera_id": 0, "model_path": "yolov10.onnx"}
try:
vision_system = VisionSystem(config)
vision_system.start()
# In a real application, this would run in a loop
pupil_data = vision_system.get_pupil_data()
print(f"Received pupil data: {pupil_data}")
vision_system.stop()
except NotImplementedError as e:
print(e)
except Exception as e:
print(f"An error occurred: {e}")

View File

@ -1,401 +1,401 @@
// Include Section // Include Section
#include "esp_dmx.h" #include "esp_dmx.h"
#include "rdm/controller.h" #include "rdm/controller.h"
#include "rdm/responder.h" #include "rdm/responder.h"
#include "UUID.h" #include "UUID.h"
#include "EEPROM.h" #include "EEPROM.h"
#define INTERRUPT_PIN 0 #define INTERRUPT_PIN 0
#include <BLEDevice.h> #include <BLEDevice.h>
#include <BLEServer.h> #include <BLEServer.h>
#include <BLEUtils.h> #include <BLEUtils.h>
#include <BLE2902.h> #include <BLE2902.h>
bool debugMode = true; bool debugMode = true;
int bleCharCount; int bleCharCount;
const int channelPerLamp = 4; const int channelPerLamp = 4;
const int expectedLampCount = 25; const int expectedLampCount = 25;
const int dmxPacketSize = channelPerLamp * expectedLampCount + 1; // const int dmxPacketSize = channelPerLamp * expectedLampCount + 1; //
struct Button { struct Button {
const uint8_t PIN; const uint8_t PIN;
uint32_t numberKeyPresses; uint32_t numberKeyPresses;
bool pressed; bool pressed;
}; };
uint8_t dmxData[DMX_PACKET_SIZE] = {0}; uint8_t dmxData[DMX_PACKET_SIZE] = {0};
BLEServer* pServer = NULL; BLEServer* pServer = NULL;
bool deviceConnected = false; bool deviceConnected = false;
bool oldDeviceConnected = false; bool oldDeviceConnected = false;
uint16_t SERVICE_UUID = 20241115; uint16_t SERVICE_UUID = 20241115;
const int panelAmount = 25; const int panelAmount = 25;
BLECharacteristic* pCharacteristics[panelAmount]; BLECharacteristic* pCharacteristics[panelAmount];
char* CHARACTERISTIC_UUIDS[panelAmount]; char* CHARACTERISTIC_UUIDS[panelAmount];
class MyServerCallbacks: public BLEServerCallbacks { class MyServerCallbacks: public BLEServerCallbacks {
void onConnect(BLEServer* pServer) { void onConnect(BLEServer* pServer) {
deviceConnected = true; deviceConnected = true;
}; };
void onDisconnect(BLEServer* pServer) { void onDisconnect(BLEServer* pServer) {
deviceConnected = false; deviceConnected = false;
} }
}; };
// Defining BOOT button on ESP32 as our built-in button. // Defining BOOT button on ESP32 as our built-in button.
Button button1 = {INTERRUPT_PIN, 0, false}; Button button1 = {INTERRUPT_PIN, 0, false};
int mode = 0; int mode = 0;
const int modeAmount = 16; const int modeAmount = 16;
uint8_t brightnessMax = 20; uint8_t brightnessMax = 20;
uint8_t universalBrightness = 10; uint8_t universalBrightness = 10;
uint8_t dataSeq[modeAmount][DMX_PACKET_SIZE] = uint8_t dataSeq[modeAmount][DMX_PACKET_SIZE] =
{ {
{ {
0, 0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0 0,0,0,0
}, },
{ {
0, 0,
0,universalBrightness,0,0, 0,universalBrightness,0,0,
0,universalBrightness,0,0, 0,universalBrightness,0,0,
0,universalBrightness,0,0, 0,universalBrightness,0,0,
0,universalBrightness,0,0, 0,universalBrightness,0,0,
0,universalBrightness,0,0, 0,universalBrightness,0,0,
0,universalBrightness,0,0, 0,universalBrightness,0,0,
0,universalBrightness,0,0, 0,universalBrightness,0,0,
0,universalBrightness,0,0, 0,universalBrightness,0,0,
0,universalBrightness,0,0, 0,universalBrightness,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0 0,0,0,0
}, },
{ {
0, 0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0 0,0,0,0
}, },
{ {
0, 0,
universalBrightness,0,0,0, // Orange universalBrightness,0,0,0, // Orange
0,universalBrightness,0,0, // White 0,universalBrightness,0,0, // White
0,universalBrightness,0,0, // White 0,universalBrightness,0,0, // White
0,0,universalBrightness,0, // Blue 0,0,universalBrightness,0, // Blue
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,universalBrightness,0,0, 0,universalBrightness,0,0,
//End Inner Round //End Inner Round
//Start Outer Round //Start Outer Round
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0 0,0,0,0
}, },
{ {
0, 0,
//Start Inner Round //Start Inner Round
0,0,universalBrightness,0, 0,0,universalBrightness,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
0,0,0,0, 0,0,0,0,
//End Inner Round //End Inner Round
//Start Outer Round //Start Outer Round
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0, universalBrightness,0,0,0,
universalBrightness,0,0,0 universalBrightness,0,0,0
} }
} }
; ;
void IRAM_ATTR isr() { void IRAM_ATTR isr() {
button1.pressed = true; button1.pressed = true;
}; };
void ledBlink(int interval, int pinNumber) { void ledBlink(int interval, int pinNumber) {
digitalWrite( digitalWrite(
pinNumber, pinNumber,
!digitalRead(pinNumber) !digitalRead(pinNumber)
); );
delay(interval); delay(interval);
}; };
void dmxSetup() { void dmxSetup() {
const dmx_port_t dmx_num = DMX_NUM_1; const dmx_port_t dmx_num = DMX_NUM_1;
Serial.printf("\nSetting up DMX Port %d", dmx_num); Serial.printf("\nSetting up DMX Port %d", dmx_num);
// First, use the default DMX configuration... // First, use the default DMX configuration...
dmx_config_t config = DMX_CONFIG_DEFAULT; dmx_config_t config = DMX_CONFIG_DEFAULT;
// Declare Personality RGBW // Declare Personality RGBW
const int personality_count = 1; const int personality_count = 1;
Serial.print("\nDefining DMX Personality... "); Serial.print("\nDefining DMX Personality... ");
dmx_personality_t personalities[] = { dmx_personality_t personalities[] = {
{4, "RGBW"} {4, "RGBW"}
}; };
Serial.print("Done"); Serial.print("Done");
Serial.print("\nInstalling DMX Driver... "); Serial.print("\nInstalling DMX Driver... ");
// ...install the DMX driver... // ...install the DMX driver...
dmx_driver_install(dmx_num, &config, personalities, personality_count); dmx_driver_install(dmx_num, &config, personalities, personality_count);
Serial.print("Done"); Serial.print("Done");
// ...and then set the communication pins! // ...and then set the communication pins!
const int tx_pin = 23; const int tx_pin = 23;
const int rx_pin = 22; const int rx_pin = 22;
const int rts_pin = 21; const int rts_pin = 21;
Serial.printf("\nSetting up pin %d as Transmit Pin, pin %d as Receive Pin and pin %d as RTS Pin... ", tx_pin, rx_pin, rts_pin); Serial.printf("\nSetting up pin %d as Transmit Pin, pin %d as Receive Pin and pin %d as RTS Pin... ", tx_pin, rx_pin, rts_pin);
dmx_set_pin(dmx_num, tx_pin, rx_pin, rts_pin); dmx_set_pin(dmx_num, tx_pin, rx_pin, rts_pin);
Serial.print("Done\n"); Serial.print("Done\n");
} }
void serialRead(){ void serialRead(){
String incomingByte; String incomingByte;
if (Serial.available() > 0) { if (Serial.available() > 0) {
// read the incoming byte: // read the incoming byte:
incomingByte = Serial.readStringUntil('\r\n'); incomingByte = Serial.readStringUntil('\r\n');
Serial.print("\nI received: "); Serial.print("\nI received: ");
Serial.print(incomingByte); Serial.print(incomingByte);
mode = incomingByte.toInt(); mode = incomingByte.toInt();
} }
} }
void setup() { void setup() {
Serial.begin(115200); Serial.begin(115200);
delay(2000); delay(2000);
Serial.print("\nIf you receive this message, ESP32 module has finished setting up Serial Interface for communication."); Serial.print("\nIf you receive this message, ESP32 module has finished setting up Serial Interface for communication.");
pinMode(INTERRUPT_PIN, INPUT_PULLUP); pinMode(INTERRUPT_PIN, INPUT_PULLUP);
attachInterrupt(digitalPinToInterrupt(INTERRUPT_PIN), isr, RISING); attachInterrupt(digitalPinToInterrupt(INTERRUPT_PIN), isr, RISING);
//Begin of the DMX Setup //Begin of the DMX Setup
const dmx_port_t dmx_num = DMX_NUM_1; const dmx_port_t dmx_num = DMX_NUM_1;
dmxSetup(); dmxSetup();
Serial.println("Welcome to Pupilometer LED Billboard!"); Serial.println("Welcome to Pupilometer LED Billboard!");
const int array_size = 25; const int array_size = 25;
rdm_uid_t uids[array_size]; rdm_uid_t uids[array_size];
// This function blocks and may take some time to complete! // This function blocks and may take some time to complete!
Serial.printf("Attempting to Discover the Existing DMX Network... "); Serial.printf("Attempting to Discover the Existing DMX Network... ");
int num_uids = rdm_discover_devices_simple(DMX_NUM_1, uids, array_size); int num_uids = rdm_discover_devices_simple(DMX_NUM_1, uids, array_size);
Serial.printf("Done!\n"); Serial.printf("Done!\n");
Serial.printf("Discovery found %i UIDs as following:\n", num_uids); Serial.printf("Discovery found %i UIDs as following:\n", num_uids);
for (int i = 0; i < num_uids; i++){ for (int i = 0; i < num_uids; i++){
printf(UIDSTR "\n", UID2STR(uids[i])); printf(UIDSTR "\n", UID2STR(uids[i]));
}; };
// Create the BLE Device // Create the BLE Device
BLEDevice::init("Pupilometer LED Billboard"); BLEDevice::init("Pupilometer LED Billboard");
// Create the BLE Server // Create the BLE Server
pServer = BLEDevice::createServer(); pServer = BLEDevice::createServer();
pServer->setCallbacks(new MyServerCallbacks()); pServer->setCallbacks(new MyServerCallbacks());
// Create the BLE Service // Create the BLE Service
BLEService *pService = pServer->createService(SERVICE_UUID,52); BLEService *pService = pServer->createService(SERVICE_UUID,52);
const bool debugMode = false; const bool debugMode = false;
// Serial.printf(debugMode); // Serial.printf(debugMode);
// Create a BLE Characteristic // Create a BLE Characteristic
Serial.printf("\nCalculating BLE Charateristic Count"); Serial.printf("\nCalculating BLE Charateristic Count");
bleCharCount = (panelAmount * debugMode) + !debugMode; bleCharCount = (panelAmount * debugMode) + !debugMode;
Serial.printf("\nCalculating BLE MTU ..."); Serial.printf("\nCalculating BLE MTU ...");
uint16_t bleMTU = ((panelAmount * 3) / bleCharCount) + 3; uint16_t bleMTU = ((panelAmount * 3) / bleCharCount) + 3;
Serial.printf("\nSetting BLE MTU to %i bytes... ", bleMTU); Serial.printf("\nSetting BLE MTU to %i bytes... ", bleMTU);
BLEDevice::setMTU(bleMTU + 3); BLEDevice::setMTU(bleMTU + 3);
Serial.printf("Done!\n"); Serial.printf("Done!\n");
for (uint32_t i = 0; i < bleCharCount; i++){ for (uint32_t i = 0; i < bleCharCount; i++){
//UUID uuid; //UUID uuid;
//uuid.seed(i+1); //uuid.seed(i+1);
//uuid.generate(); //uuid.generate();
//Serial.printf("Creating BLE Characteristic with UUID %s ...", BLEUUID(i+1)); //Serial.printf("Creating BLE Characteristic with UUID %s ...", BLEUUID(i+1));
pCharacteristics[i] = pService->createCharacteristic( pCharacteristics[i] = pService->createCharacteristic(
i+1, i+1,
// BLEUUID(uuid.toCharArray()), // BLEUUID(uuid.toCharArray()),
BLECharacteristic::PROPERTY_READ | BLECharacteristic::PROPERTY_READ |
BLECharacteristic::PROPERTY_WRITE | BLECharacteristic::PROPERTY_WRITE |
BLECharacteristic::PROPERTY_NOTIFY | BLECharacteristic::PROPERTY_NOTIFY |
BLECharacteristic::PROPERTY_INDICATE BLECharacteristic::PROPERTY_INDICATE
); );
Serial.printf("Created BLE Characteristic with UUID %s ...", pCharacteristics[i]->getUUID().toString().c_str()); Serial.printf("Created BLE Characteristic with UUID %s ...", pCharacteristics[i]->getUUID().toString().c_str());
// pCharacteristics[i]->addDescriptor(new BLE2902()); // pCharacteristics[i]->addDescriptor(new BLE2902());
// Serial.printf("Done\n"); // Serial.printf("Done\n");
}; };
// Start the service // Start the service
pService->start(); pService->start();
// Start advertising // Start advertising
BLEAdvertising *pAdvertising = BLEDevice::getAdvertising(); BLEAdvertising *pAdvertising = BLEDevice::getAdvertising();
pAdvertising->addServiceUUID(SERVICE_UUID); pAdvertising->addServiceUUID(SERVICE_UUID);
pAdvertising->setScanResponse(false); pAdvertising->setScanResponse(false);
pAdvertising->setMinPreferred(0x0); // set value to 0x00 to not advertise this parameter pAdvertising->setMinPreferred(0x0); // set value to 0x00 to not advertise this parameter
BLEDevice::startAdvertising(); BLEDevice::startAdvertising();
} }
void loop() { void loop() {
// Save Old Mode // Save Old Mode
int modeOld = mode; int modeOld = mode;
int msgSize; int msgSize;
uint8_t* btMessage[bleCharCount]; uint8_t* btMessage[bleCharCount];
// uint8_t dmxData[DMX_PACKET_SIZE] = {0}; // uint8_t dmxData[DMX_PACKET_SIZE] = {0};
// notify changed value // notify changed value
if (deviceConnected) { if (deviceConnected) {
} }
// disconnecting // disconnecting
if (!deviceConnected && oldDeviceConnected) { if (!deviceConnected && oldDeviceConnected) {
delay(500); // give the bluetooth stack the chance to get things ready delay(500); // give the bluetooth stack the chance to get things ready
pServer->startAdvertising(); // restart advertising pServer->startAdvertising(); // restart advertising
Serial.println("Start advertising"); Serial.println("Start advertising");
oldDeviceConnected = deviceConnected; oldDeviceConnected = deviceConnected;
} }
// connecting // connecting
if (deviceConnected && !oldDeviceConnected) { if (deviceConnected && !oldDeviceConnected) {
// do stuff here on connecting // do stuff here on connecting
oldDeviceConnected = deviceConnected; oldDeviceConnected = deviceConnected;
} }
// Serial.printf("\nConstructing Payload using "); // Serial.printf("\nConstructing Payload using ");
// Serial.printf("Bluetooth Data ..."); // Serial.printf("Bluetooth Data ...");
if (button1.pressed){ if (button1.pressed){
if (mode < modeAmount - 1){mode++;} else {mode = 0;}; if (mode < modeAmount - 1){mode++;} else {mode = 0;};
// Increment the value of each slot, excluding the start code. // Increment the value of each slot, excluding the start code.
button1.pressed = false; // Reset button status to FALSE button1.pressed = false; // Reset button status to FALSE
}; };
serialRead(); serialRead();
if (modeOld != mode){ if (modeOld != mode){
Serial.printf("\nChanging Lighting Preset to Preset %d", mode); Serial.printf("\nChanging Lighting Preset to Preset %d", mode);
uint8_t lampData[DMX_PACKET_SIZE / 4 * 3]; uint8_t lampData[DMX_PACKET_SIZE / 4 * 3];
Serial.printf("\nDetected preset %i size: %i", mode, sizeof(dataSeq[mode])); Serial.printf("\nDetected preset %i size: %i", mode, sizeof(dataSeq[mode]));
for (int i = 0; i < sizeof(dataSeq[mode]); i++){ for (int i = 0; i < sizeof(dataSeq[mode]); i++){
dmxData[i] = dataSeq[mode][i]; dmxData[i] = dataSeq[mode][i];
int sublampIndex = i % 4; int sublampIndex = i % 4;
//Serial.printf("[%i]", sublampIndex, j); //Serial.printf("[%i]", sublampIndex, j);
if (sublampIndex > 0) { if (sublampIndex > 0) {
int j = (i / 4) * 3 + sublampIndex - 1; int j = (i / 4) * 3 + sublampIndex - 1;
Serial.printf("[%i](%i)", j, sublampIndex); Serial.printf("[%i](%i)", j, sublampIndex);
lampData[j] = dataSeq[mode][i]; lampData[j] = dataSeq[mode][i];
} }
}; };
pCharacteristics[0]->setValue(lampData, expectedLampCount * 3); pCharacteristics[0]->setValue(lampData, expectedLampCount * 3);
} }
Serial.printf("\nConstructing DMX Payload with size "); Serial.printf("\nConstructing DMX Payload with size ");
for (int i = 0; i < bleCharCount; i++){ for (int i = 0; i < bleCharCount; i++){
btMessage[i] = pCharacteristics[i]->getData(); btMessage[i] = pCharacteristics[i]->getData();
msgSize = pCharacteristics[i]->getLength(); msgSize = pCharacteristics[i]->getLength();
Serial.printf("%i bytes ", msgSize); Serial.printf("%i bytes ", msgSize);
for (int j = 0; j < msgSize; j++){ for (int j = 0; j < msgSize; j++){
int packet = btMessage[i][j]; int packet = btMessage[i][j];
int lampSum = i*3 + j; int lampSum = i*3 + j;
int dmxAddress = (lampSum / 3) * 4 + lampSum % 3 + 1; int dmxAddress = (lampSum / 3) * 4 + lampSum % 3 + 1;
dmxData[dmxAddress] = packet; dmxData[dmxAddress] = packet;
// Serial.printf("[[%i,%i] %i - %i] ",i , j, dmxAddress, packet); // Serial.printf("[[%i,%i] %i - %i] ",i , j, dmxAddress, packet);
}; };
}; };
Serial.printf("\n"); Serial.printf("\n");
// Serial.printf(" Done"); // Serial.printf(" Done");
// Wait until the packet is finished being sent before proceeding. // Wait until the packet is finished being sent before proceeding.
dmx_wait_sent(DMX_NUM_1, DMX_TIMEOUT_TICK); dmx_wait_sent(DMX_NUM_1, DMX_TIMEOUT_TICK);
// Now write the packet synchronously! // Now write the packet synchronously!
dmx_write(DMX_NUM_1, dmxData, DMX_PACKET_SIZE); dmx_write(DMX_NUM_1, dmxData, DMX_PACKET_SIZE);
dmx_send(DMX_NUM_1); dmx_send(DMX_NUM_1);
} }

100
tests/test_e2e.py Normal file
View File

@ -0,0 +1,100 @@
import pytest
import subprocess
import time
import requests
import os
import sys
from playwright.sync_api import Page, expect
# Define the host and port for the application
HOST = "127.0.0.1"
PORT = 5000
BASE_URL = f"http://{HOST}:{PORT}"
STDOUT_FILE = "app_stdout.log"
STDERR_FILE = "app_stderr.log"
@pytest.fixture(scope="module")
def run_app():
"""
Fixture to run the Flask application in a test environment.
"""
# Set the environment variable for the subprocess
env = os.environ.copy()
env["PUPILOMETER_ENV"] = "test"
command = [sys.executable, "-u", "app.py"]
with open(STDOUT_FILE, "w") as stdout_f, open(STDERR_FILE, "w") as stderr_f:
process = subprocess.Popen(
command,
cwd="src/controllerSoftware",
stdout=stdout_f,
stderr=stderr_f,
text=True,
env=env
)
# Wait for the app to start
start_time = time.time()
while True:
if os.path.exists(STDERR_FILE):
with open(STDERR_FILE, "r") as f:
if "* Running on http" in f.read():
break
if time.time() - start_time > 15:
raise TimeoutError("Flask app failed to start in time.")
time.sleep(0.5)
yield process
process.terminate()
process.wait()
if os.path.exists(STDOUT_FILE):
os.remove(STDOUT_FILE)
if os.path.exists(STDERR_FILE):
os.remove(STDERR_FILE)
def test_program_output(run_app):
"""
Tests that the mock backend is initialized.
"""
with open(STDOUT_FILE, "r") as f:
stdout = f.read()
assert "Initializing Mock backend for testing..." in stdout
assert "MockBackend initialized." in stdout
def test_curl_output(run_app):
"""
Tests the API endpoints using requests (similar to curl).
"""
# Test the /ble_status endpoint
response_ble = requests.get(f"{BASE_URL}/ble_status")
assert response_ble.status_code == 200
assert response_ble.json() == {"connected": True} # In DEBUG_MODE
# Test the /vision/pupil_data endpoint
response_vision = requests.get(f"{BASE_URL}/vision/pupil_data")
assert response_vision.status_code == 200
assert "data" in response_vision.json()
assert "success" in response_vision.json()
def test_playwright_checks(page: Page, run_app):
"""
Performs basic and visual checks using Playwright.
"""
page.goto(BASE_URL)
# Basic output check: Title and heading
expect(page).to_have_title("Lamp Matrix Control")
heading = page.locator("h1")
expect(heading).to_have_text("Lamp Matrix Control")
# Visual check: Screenshot
os.makedirs("screenshots", exist_ok=True)
screenshot_path = "screenshots/homepage.png"
page.screenshot(path=screenshot_path)
assert os.path.exists(screenshot_path)

95
tests/test_vision.py Normal file
View File

@ -0,0 +1,95 @@
import unittest
from unittest.mock import patch
import sys
import os
# Add the src/controllerSoftware directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../src/controllerSoftware')))
from vision import VisionSystem, DeepStreamBackend, PythonBackend
class TestVisionSystem(unittest.TestCase):
"""
Unit tests for the VisionSystem class.
"""
def setUp(self):
"""
Set up a VisionSystem instance with a mocked backend for each test.
"""
self.config = {"camera_id": 0, "model_path": "yolov10.onnx"}
@patch('platform.system')
@patch('vision.DeepStreamBackend')
def test_initialization_linux(self, mock_backend, mock_system):
"""
Test that the VisionSystem initializes the DeepStreamBackend on Linux.
"""
mock_system.return_value = 'Linux'
vision_system = VisionSystem(self.config)
mock_backend.assert_called_once_with(self.config)
@patch('platform.system')
@patch('vision.DeepStreamBackend')
def test_initialization_windows(self, mock_backend, mock_system):
"""
Test that the VisionSystem initializes the DeepStreamBackend on Windows.
"""
mock_system.return_value = 'Windows'
vision_system = VisionSystem(self.config)
mock_backend.assert_called_once_with(self.config)
@patch('platform.system')
@patch('vision.PythonBackend')
def test_initialization_macos(self, mock_backend, mock_system):
"""
Test that the VisionSystem initializes the PythonBackend on macOS.
"""
mock_system.return_value = 'Darwin'
vision_system = VisionSystem(self.config)
mock_backend.assert_called_once_with(self.config)
@patch('platform.system')
def test_initialization_unsupported(self, mock_system):
"""
Test that the VisionSystem raises an exception on an unsupported OS.
"""
mock_system.return_value = 'UnsupportedOS'
with self.assertRaises(NotImplementedError):
VisionSystem(self.config)
@patch('platform.system')
@patch('vision.DeepStreamBackend')
def test_start(self, mock_backend, mock_system):
"""
Test that the start method calls the backend's start method.
"""
mock_system.return_value = 'Linux'
vision_system = VisionSystem(self.config)
vision_system.start()
vision_system._backend.start.assert_called_once()
@patch('platform.system')
@patch('vision.DeepStreamBackend')
def test_stop(self, mock_backend, mock_system):
"""
Test that the stop method calls the backend's stop method.
"""
mock_system.return_value = 'Linux'
vision_system = VisionSystem(self.config)
vision_system.stop()
vision_system._backend.stop.assert_called_once()
@patch('platform.system')
@patch('vision.DeepStreamBackend')
def test_get_pupil_data(self, mock_backend, mock_system):
"""
Test that the get_pupil_data method calls the backend's get_pupil_data method.
"""
mock_system.return_value = 'Linux'
vision_system = VisionSystem(self.config)
vision_system.get_pupil_data()
vision_system._backend.get_pupil_data.assert_called_once()
if __name__ == '__main__':
unittest.main()