Compare commits

...

5 Commits

16 changed files with 1759 additions and 901 deletions

11
.gitignore vendored Normal file
View File

@ -0,0 +1,11 @@
# Virtual Environment
.venv/
# Python cache
__pycache__/
*.pyc
# Test artifacts
app_stdout.log
app_stderr.log
screenshots/

View File

@ -3,3 +3,26 @@
## Introduction ## Introduction
This repository houses programs and documents related to Pupilometer project by Vietnam Academy of Science and Technology. The project aims to introduce a benchmark and researches into the interaction between light intensity and temperature to the eye strain disorder. This repository houses programs and documents related to Pupilometer project by Vietnam Academy of Science and Technology. The project aims to introduce a benchmark and researches into the interaction between light intensity and temperature to the eye strain disorder.
## Dependencies
### Python Dependencies
The Python dependencies are listed in the `requirements.txt` file. You can install them using pip:
```bash
pip install -r requirements.txt
```
### NVIDIA DeepStream
For running the pupil segmentation on a Jetson Orin AGX or a Windows machine with an NVIDIA GPU, this project uses NVIDIA DeepStream. DeepStream is a complex dependency and cannot be installed via pip.
Please follow the official NVIDIA documentation to install DeepStream for your platform:
* **Jetson:** [DeepStream for Jetson](https://developer.nvidia.com/deepstream-sdk-jetson)
* **Windows:** [DeepStream for Windows](https://developer.nvidia.com/deepstream-sdk-windows)
You will also need to install GStreamer and the Python bindings (PyGObject). These are usually installed as part of the DeepStream installation.
Additionally, the `pyds` library, which provides Python bindings for DeepStream metadata structures, is required. This library is also included with the DeepStream SDK and may need to be installed manually.

View File

@ -1,2 +1,8 @@
bleak>="1.0.0" bleak>="1.0.0"
flask>="3.1.1" flask>="3.1.1"
pypylon>= "4.0.0"
onnxruntime>= "1.18.0"
opencv-python>= "4.9.0"
pytest>= "8.0.0"
pytest-playwright>= "0.4.0"
requests>= "2.31.0"

View File

@ -7,6 +7,7 @@ import json
import sys import sys
import signal import signal
import os import os
from vision import VisionSystem
# ================================================================================================= # =================================================================================================
# APP CONFIGURATION # APP CONFIGURATION
@ -14,15 +15,17 @@ import os
# Set to True to run without a physical BLE device for testing purposes. # Set to True to run without a physical BLE device for testing purposes.
# Set to False to connect to the actual lamp matrix. # Set to False to connect to the actual lamp matrix.
DEBUG_MODE = False DEBUG_MODE = True
# --- BLE Device Configuration (Ignored in DEBUG_MODE) --- # --- BLE Device Configuration (Ignored in DEBUG_MODE) ---
DEVICE_NAME = "Pupilometer LED Billboard" DEVICE_NAME = "Pupilometer LED Billboard"
global ble_client global ble_client
global ble_characteristics global ble_characteristics
global ble_connection_status
ble_client = None ble_client = None
ble_characteristics = None ble_characteristics = None
ble_event_loop = None # Will be initialized if not in debug mode ble_event_loop = None # Will be initialized if not in debug mode
ble_connection_status = False
# ================================================================================================= # =================================================================================================
# BLE HELPER FUNCTIONS (Used in LIVE mode) # BLE HELPER FUNCTIONS (Used in LIVE mode)
@ -71,6 +74,7 @@ SPIRAL_MAP_5x5 = create_spiral_map(5)
async def set_full_matrix_on_ble(colorSeries): async def set_full_matrix_on_ble(colorSeries):
global ble_client global ble_client
global ble_characteristics global ble_characteristics
global ble_connection_status
if not ble_client or not ble_client.is_connected: if not ble_client or not ble_client.is_connected:
print("BLE client not connected. Attempting to reconnect...") print("BLE client not connected. Attempting to reconnect...")
@ -120,6 +124,7 @@ async def set_full_matrix_on_ble(colorSeries):
async def connect_to_ble_device(): async def connect_to_ble_device():
global ble_client global ble_client
global ble_characteristics global ble_characteristics
global ble_connection_status
print(f"Scanning for device: {DEVICE_NAME}...") print(f"Scanning for device: {DEVICE_NAME}...")
devices = await BleakScanner.discover() devices = await BleakScanner.discover()
@ -127,6 +132,7 @@ async def connect_to_ble_device():
if not target_device: if not target_device:
print(f"Device '{DEVICE_NAME}' not found.") print(f"Device '{DEVICE_NAME}' not found.")
ble_connection_status = False
return False return False
print(f"Found device: {target_device.name} ({target_device.address})") print(f"Found device: {target_device.name} ({target_device.address})")
@ -144,12 +150,15 @@ async def connect_to_ble_device():
] ]
ble_characteristics = sorted(characteristics, key=lambda char: char.handle) ble_characteristics = sorted(characteristics, key=lambda char: char.handle)
print(f"Found {len(ble_characteristics)} characteristics for lamps.") print(f"Found {len(ble_characteristics)} characteristics for lamps.")
ble_connection_status = True
return True return True
else: else:
print(f"Failed to connect to {target_device.name}") print(f"Failed to connect to {target_device.name}")
ble_connection_status = False
return False return False
except Exception as e: except Exception as e:
print(f"An error occurred during BLE connection: {e}") print(f"An error occurred during BLE connection: {e}")
ble_connection_status = False
return False return False
# ================================================================================================= # =================================================================================================
# COLOR MIXING # COLOR MIXING
@ -255,14 +264,42 @@ def set_matrix():
print(f"Getting current lamp matrix info: {lamp_matrix}") print(f"Getting current lamp matrix info: {lamp_matrix}")
@app.route('/ble_status')
def ble_status():
global ble_connection_status
if DEBUG_MODE:
return jsonify(connected=True)
return jsonify(connected=ble_connection_status)
@app.route('/vision/pupil_data')
def get_pupil_data():
"""
Endpoint to get the latest pupil segmentation data from the vision system.
"""
if vision_system:
data = vision_system.get_pupil_data()
return jsonify(success=True, data=data)
return jsonify(success=False, message="Vision system not initialized"), 500
# ================================================================================================= # =================================================================================================
# APP STARTUP # APP STARTUP
# ================================================================================================= # =================================================================================================
vision_system = None
def signal_handler(signum, frame): def signal_handler(signum, frame):
print("Received shutdown signal, gracefully shutting down...") print("Received shutdown signal, gracefully shutting down...")
global ble_connection_status
# Stop the vision system
if vision_system:
print("Stopping vision system...")
vision_system.stop()
print("Vision system stopped.")
if not DEBUG_MODE and ble_client and ble_client.is_connected: if not DEBUG_MODE and ble_client and ble_client.is_connected:
print("Disconnecting BLE client...") print("Disconnecting BLE client...")
ble_connection_status = False
disconnect_future = asyncio.run_coroutine_threadsafe(ble_client.disconnect(), ble_event_loop) disconnect_future = asyncio.run_coroutine_threadsafe(ble_client.disconnect(), ble_event_loop)
try: try:
# Wait for the disconnect to complete with a timeout # Wait for the disconnect to complete with a timeout
@ -285,6 +322,16 @@ if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
# Initialize and start the Vision System
try:
vision_config = {"camera_id": 0, "model_path": "yolov10.onnx"}
vision_system = VisionSystem(config=vision_config)
vision_system.start()
except Exception as e:
print(f"Failed to initialize or start Vision System: {e}")
vision_system = None
if not DEBUG_MODE: if not DEBUG_MODE:
print("Starting BLE event loop in background thread...") print("Starting BLE event loop in background thread...")
ble_event_loop = asyncio.new_event_loop() ble_event_loop = asyncio.new_event_loop()

View File

View File

@ -0,0 +1,235 @@
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import pyds
import threading
try:
from pypylon import pylon
except ImportError:
print("pypylon is not installed. DeepStreamBackend will not be able to get frames from Basler camera.")
pylon = None
class DeepStreamPipeline:
"""
A class to manage the DeepStream pipeline for pupil segmentation.
"""
def __init__(self, config):
self.config = config
Gst.init(None)
self.pipeline = None
self.loop = GLib.MainLoop()
self.pupil_data = None
self.camera = None
self.frame_feeder_thread = None
self.is_running = False
print("DeepStreamPipeline initialized.")
def _frame_feeder_thread(self, appsrc):
"""
Thread function to feed frames from the Basler camera to the appsrc element.
"""
while self.is_running:
if not self.camera or not self.camera.IsGrabbing():
print("Camera not ready, stopping frame feeder.")
break
try:
grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grab_result.GrabSucceeded():
frame = grab_result.Array
# Create a Gst.Buffer
buf = Gst.Buffer.new_allocate(None, len(frame), None)
buf.fill(0, frame)
# Push the buffer into the appsrc
appsrc.emit('push-buffer', buf)
else:
print(f"Error grabbing frame: {grab_result.ErrorCode}")
except Exception as e:
print(f"An error occurred in frame feeder thread: {e}")
break
finally:
if 'grab_result' in locals() and grab_result:
grab_result.Release()
def bus_call(self, bus, message, loop):
"""
Callback function for handling messages from the GStreamer bus.
"""
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream\n")
self.is_running = False
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write("Error: %s: %s\n" % (err, debug))
self.is_running = False
loop.quit()
return True
def pgie_sink_pad_buffer_probe(self, pad, info, u_data):
"""
Probe callback function for the sink pad of the pgie element.
"""
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return Gst.PadProbeReturn.OK
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the address of gst_buffer as input, which is a ptr.
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
frame_meta = pyds.glist_get_data(l_frame)
except StopIteration:
break
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta = pyds.glist_get_data(l_obj)
except StopIteration:
break
# Access and process object metadata
rect_params = obj_meta.rect_params
top = rect_params.top
left = rect_params.left
width = rect_params.width
height = rect_params.height
self.pupil_data = {
"bounding_box": [left, top, left + width, top + height],
"confidence": obj_meta.confidence
}
print(f"Pupil detected: {self.pupil_data}")
try:
l_obj = l_obj.next
except StopIteration:
break
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def start(self):
"""
Builds and starts the DeepStream pipeline.
"""
if not pylon:
raise ImportError("pypylon is not installed. Cannot start DeepStreamPipeline with Basler camera.")
# Initialize camera
try:
self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
self.camera.Open()
self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
print("DeepStreamPipeline: Basler camera opened and started grabbing.")
except Exception as e:
print(f"DeepStreamPipeline: Error opening Basler camera: {e}")
return
self.pipeline = Gst.Pipeline()
if not self.pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
return
source = Gst.ElementFactory.make("appsrc", "app-source")
# ... (element creation remains the same)
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
sink = Gst.ElementFactory.make("fakesink", "sink")
videoconvert = Gst.ElementFactory.make("nvvideoconvert", "nv-videoconvert")
# Set appsrc properties
# TODO: Set caps based on camera properties
caps = Gst.Caps.from_string("video/x-raw,format=GRAY8,width=1280,height=720,framerate=30/1")
source.set_property("caps", caps)
source.set_property("format", "time")
pgie.set_property('config-file-path', "pgie_yolov10_config.txt")
self.pipeline.add(source)
self.pipeline.add(videoconvert)
self.pipeline.add(pgie)
self.pipeline.add(sink)
if not source.link(videoconvert):
sys.stderr.write(" Unable to link source to videoconvert \n")
return
if not videoconvert.link(pgie):
sys.stderr.write(" Unable to link videoconvert to pgie \n")
return
if not pgie.link(sink):
sys.stderr.write(" Unable to link pgie to sink \n")
return
pgie_sink_pad = pgie.get_static_pad("sink")
if not pgie_sink_pad:
sys.stderr.write(" Unable to get sink pad of pgie \n")
return
pgie_sink_pad.add_probe(Gst.PadProbeType.BUFFER, self.pgie_sink_pad_buffer_probe, 0)
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self.bus_call, self.loop)
self.is_running = True
self.frame_feeder_thread = threading.Thread(target=self._frame_feeder_thread, args=(source,))
self.frame_feeder_thread.start()
print("Starting pipeline...")
self.pipeline.set_state(Gst.State.PLAYING)
print("DeepStreamPipeline started.")
def stop(self):
"""
Stops the DeepStream pipeline.
"""
self.is_running = False
if self.frame_feeder_thread:
self.frame_feeder_thread.join()
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
print("DeepStreamPipeline stopped.")
if self.camera and self.camera.IsGrabbing():
self.camera.StopGrabbing()
if self.camera and self.camera.IsOpen():
self.camera.Close()
print("DeepStreamPipeline: Basler camera closed.")
def get_data(self):
"""
Retrieves data from the pipeline.
"""
return self.pupil_data
if __name__ == '__main__':
config = {}
pipeline = DeepStreamPipeline(config)
pipeline.start()
# Run the GLib main loop in the main thread
try:
pipeline.loop.run()
except KeyboardInterrupt:
print("Interrupted by user.")
pipeline.stop()

View File

View File

View File

@ -0,0 +1,18 @@
[property]
gpu-id=0
net-scale-factor=0.00392156862745098
#onnx-file=yolov10.onnx
model-engine-file=model.engine
#labelfile-path=labels.txt
batch-size=1
process-mode=1
model-color-format=0
network-mode=0
num-detected-classes=1
gie-unique-id=1
output-blob-names=output0
[class-attrs-all]
pre-cluster-threshold=0.2
eps=0.2
group-threshold=1

View File

@ -0,0 +1,285 @@
// State for the entire 5x5 matrix, storing {ww, cw, blue} for each lamp
var lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0}));
var selectedLamps = [];
// Function to calculate a visual RGB color from the three light values using a proper additive model
function calculateRgb(ww, cw, blue) {
// Define the RGB components for each light source based on slider track colors
const warmWhiteR = 255;
const warmWhiteG = 192;
const warmWhiteB = 128;
const coolWhiteR = 192;
const coolWhiteG = 224;
const coolWhiteB = 255;
const blueR = 0;
const blueG = 0;
const blueB = 255;
// Normalize the slider values (0-255) and apply them to the base colors
var r = (ww / 255) * warmWhiteR + (cw / 255) * coolWhiteR + (blue / 255) * blueR;
var g = (ww / 255) * warmWhiteG + (cw / 255) * coolWhiteG + (blue / 255) * blueG;
var b = (ww / 255) * warmWhiteB + (cw / 255) * coolWhiteB + (blue / 255) * blueB;
// Clamp the values to 255 and convert to integer
r = Math.min(255, Math.round(r));
g = Math.min(255, Math.round(g));
b = Math.min(255, Math.round(b));
// Convert to hex string
var toHex = (c) => ('0' + c.toString(16)).slice(-2);
return '#' + toHex(r) + toHex(g) + toHex(b);
}
function updateLampUI(lamp, colorState) {
var newColor = calculateRgb(colorState.ww, colorState.cw, colorState.blue);
var lampElement = $(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`);
lampElement.css('background-color', newColor);
if (newColor === '#000000') {
lampElement.removeClass('on');
lampElement.css('box-shadow', `inset 0 0 5px rgba(0,0,0,0.5)`);
} else {
lampElement.addClass('on');
lampElement.css('box-shadow', `0 0 15px ${newColor}, 0 0 25px ${newColor}`);
}
}
// Function to update the UI and send the full matrix state to the backend
function sendFullMatrixUpdate(lampsToUpdate, isRegionUpdate = false) {
var fullMatrixData = lampMatrixState.map(row => row.map(lamp => ({
ww: lamp.ww,
cw: lamp.cw,
blue: lamp.blue
})));
$.ajax({
url: '/set_matrix',
type: 'POST',
contentType: 'application/json',
data: JSON.stringify({ matrix: fullMatrixData }),
success: function(response) {
if (response.success) {
if (isRegionUpdate) {
// On a region button click, update the entire matrix UI
for (var r = 0; r < 5; r++) {
for (var c = 0; c < 5; c++) {
updateLampUI({row: r, col: c}, lampMatrixState[r][c]);
}
}
} else {
// Otherwise, just update the lamps that changed
lampsToUpdate.forEach(function(lamp) {
updateLampUI(lamp, lampMatrixState[lamp.row][lamp.col]);
});
}
}
}
});
}
function updateSliders(ww, cw, blue, prefix = '') {
$(`#${prefix}ww-slider`).val(ww);
$(`#${prefix}cw-slider`).val(cw);
$(`#${prefix}blue-slider`).val(blue);
$(`#${prefix}ww-number`).val(ww);
$(`#${prefix}cw-number`).val(cw);
$(`#${prefix}blue-number`).val(blue);
}
$(document).ready(function() {
var regionMaps = {
'Upper': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
],
'Lower': [
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'Left': [
{row: 0, col: 0}, {row: 1, col: 0}, {row: 2, col: 0}, {row: 3, col: 0}, {row: 4, col: 0},
{row: 0, col: 1}, {row: 1, col: 1}, {row: 2, col: 1}, {row: 3, col: 1}, {row: 4, col: 1},
],
'Right': [
{row: 0, col: 3}, {row: 1, col: 3}, {row: 2, col: 3}, {row: 3, col: 3}, {row: 4, col: 3},
{row: 0, col: 4}, {row: 1, col: 4}, {row: 2, col: 4}, {row: 3, col: 4}, {row: 4, col: 4},
],
'Inner ring': [
{row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3},
{row: 2, col: 1}, {row: 2, col: 3},
{row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}
],
'Outer ring': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'All': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 1}, {row: 2, col: 3}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
]
};
// Exclude the center lamp from the 'All' region
var allRegionWithoutCenter = regionMaps['All'].filter(lamp => !(lamp.row === 2 && lamp.col === 2));
regionMaps['All'] = allRegionWithoutCenter;
// Initialize lampMatrixState from the initial HTML colors
$('.lamp').each(function() {
var row = $(this).data('row');
var col = $(this).data('col');
var color = $(this).css('background-color');
var rgb = color.match(/\d+/g);
lampMatrixState[row][col] = {
ww: rgb[0], cw: rgb[1], blue: rgb[2]
};
});
$('#region-select').on('change', function() {
var region = $(this).val();
// Toggle the inactive state of the control panel based on selection
if (region) {
$('.control-panel').removeClass('inactive-control');
} else {
$('.control-panel').addClass('inactive-control');
}
var newlySelectedLamps = regionMaps[region];
// Clear selected class from all lamps
$('.lamp').removeClass('selected');
// Get the current slider values to use as the new default
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
// Reset all lamps except the center to black in our state
var lampsToUpdate = [];
var centerLampState = lampMatrixState[2][2];
lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0}));
lampMatrixState[2][2] = centerLampState; // Preserve center lamp state
// Set newly selected lamps to the current slider values
selectedLamps = newlySelectedLamps;
selectedLamps.forEach(function(lamp) {
$(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`).addClass('selected');
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
});
if (selectedLamps.length > 0) {
// Update sliders to reflect the state of the first selected lamp
var firstLamp = selectedLamps[0];
var firstLampState = lampMatrixState[firstLamp.row][firstLamp.col];
updateSliders(firstLampState.ww, firstLampState.cw, firstLampState.blue, '');
}
// Send the full matrix state
sendFullMatrixUpdate(lampsToUpdate, true);
});
// Event listener for the region sliders and number inputs
$('.region-slider-group input').on('input', function() {
if (selectedLamps.length === 0) return;
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
// Clamp value
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
var lampsToUpdate = [];
selectedLamps.forEach(function(lamp) {
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
lampsToUpdate.push(lamp);
});
sendFullMatrixUpdate(lampsToUpdate);
});
// Event listener for the center lamp sliders and number inputs
$('.center-slider-group input').on('input', function() {
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
// Clamp value
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#center-ww-slider').val());
var cw = parseInt($('#center-cw-slider').val());
var blue = parseInt($('#center-blue-slider').val());
var centerLamp = {row: 2, col: 2};
lampMatrixState[centerLamp.row][centerLamp.col] = {ww: ww, cw: cw, blue: blue};
sendFullMatrixUpdate([centerLamp]);
});
// Initial check to set the inactive state
if (!$('#region-select').val()) {
$('.control-panel').addClass('inactive-control');
}
function checkBleStatus() {
$.ajax({
url: '/ble_status',
type: 'GET',
success: function(response) {
var statusElement = $('#ble-status');
if (response.connected) {
statusElement.text('BLE Connected');
statusElement.css('color', 'lightgreen');
} else {
statusElement.text('BLE Disconnected');
statusElement.css('color', 'red');
}
},
error: function() {
var statusElement = $('#ble-status');
statusElement.text('Reconnecting...');
statusElement.css('color', 'orange');
}
});
}
setInterval(checkBleStatus, 2000);
checkBleStatus(); // Initial check
});

View File

@ -149,3 +149,14 @@ input.blue::-webkit-slider-runnable-track { background: linear-gradient(to right
align-items: center; align-items: center;
} }
} }
#ble-status {
position: fixed;
top: 10px;
right: 10px;
font-size: 16px;
color: #fff;
background-color: #333;
padding: 5px 10px;
border-radius: 5px;
}

View File

@ -4,271 +4,11 @@
<title>Lamp Matrix Control</title> <title>Lamp Matrix Control</title>
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}"> <link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script> <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script> <script src="{{ url_for('static', filename='script.js') }}"></script>
// State for the entire 5x5 matrix, storing {ww, cw, blue} for each lamp
var lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0}));
var selectedLamps = [];
// Function to calculate a visual RGB color from the three light values using a proper additive model
function calculateRgb(ww, cw, blue) {
// Define the RGB components for each light source based on slider track colors
const warmWhiteR = 255;
const warmWhiteG = 192;
const warmWhiteB = 128;
const coolWhiteR = 192;
const coolWhiteG = 224;
const coolWhiteB = 255;
const blueR = 0;
const blueG = 0;
const blueB = 255;
// Normalize the slider values (0-255) and apply them to the base colors
var r = (ww / 255) * warmWhiteR + (cw / 255) * coolWhiteR + (blue / 255) * blueR;
var g = (ww / 255) * warmWhiteG + (cw / 255) * coolWhiteG + (blue / 255) * blueG;
var b = (ww / 255) * warmWhiteB + (cw / 255) * coolWhiteB + (blue / 255) * blueB;
// Clamp the values to 255 and convert to integer
r = Math.min(255, Math.round(r));
g = Math.min(255, Math.round(g));
b = Math.min(255, Math.round(b));
// Convert to hex string
var toHex = (c) => ('0' + c.toString(16)).slice(-2);
return '#' + toHex(r) + toHex(g) + toHex(b);
}
function updateLampUI(lamp, colorState) {
var newColor = calculateRgb(colorState.ww, colorState.cw, colorState.blue);
var lampElement = $(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`);
lampElement.css('background-color', newColor);
if (newColor === '#000000') {
lampElement.removeClass('on');
lampElement.css('box-shadow', `inset 0 0 5px rgba(0,0,0,0.5)`);
} else {
lampElement.addClass('on');
lampElement.css('box-shadow', `0 0 15px ${newColor}, 0 0 25px ${newColor}`);
}
}
// Function to update the UI and send the full matrix state to the backend
function sendFullMatrixUpdate(lampsToUpdate, isRegionUpdate = false) {
var fullMatrixData = lampMatrixState.map(row => row.map(lamp => ({
ww: lamp.ww,
cw: lamp.cw,
blue: lamp.blue
})));
$.ajax({
url: '/set_matrix',
type: 'POST',
contentType: 'application/json',
data: JSON.stringify({ matrix: fullMatrixData }),
success: function(response) {
if (response.success) {
if (isRegionUpdate) {
// On a region button click, update the entire matrix UI
for (var r = 0; r < 5; r++) {
for (var c = 0; c < 5; c++) {
updateLampUI({row: r, col: c}, lampMatrixState[r][c]);
}
}
} else {
// Otherwise, just update the lamps that changed
lampsToUpdate.forEach(function(lamp) {
updateLampUI(lamp, lampMatrixState[lamp.row][lamp.col]);
});
}
}
}
});
}
function updateSliders(ww, cw, blue, prefix = '') {
$(`#${prefix}ww-slider`).val(ww);
$(`#${prefix}cw-slider`).val(cw);
$(`#${prefix}blue-slider`).val(blue);
$(`#${prefix}ww-number`).val(ww);
$(`#${prefix}cw-number`).val(cw);
$(`#${prefix}blue-number`).val(blue);
}
$(document).ready(function() {
var regionMaps = {
'Upper': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
],
'Lower': [
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'Left': [
{row: 0, col: 0}, {row: 1, col: 0}, {row: 2, col: 0}, {row: 3, col: 0}, {row: 4, col: 0},
{row: 0, col: 1}, {row: 1, col: 1}, {row: 2, col: 1}, {row: 3, col: 1}, {row: 4, col: 1},
],
'Right': [
{row: 0, col: 3}, {row: 1, col: 3}, {row: 2, col: 3}, {row: 3, col: 3}, {row: 4, col: 3},
{row: 0, col: 4}, {row: 1, col: 4}, {row: 2, col: 4}, {row: 3, col: 4}, {row: 4, col: 4},
],
'Inner ring': [
{row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3},
{row: 2, col: 1}, {row: 2, col: 3},
{row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}
],
'Outer ring': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'All': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 1}, {row: 2, col: 3}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
]
};
// Exclude the center lamp from the 'All' region
var allRegionWithoutCenter = regionMaps['All'].filter(lamp => !(lamp.row === 2 && lamp.col === 2));
regionMaps['All'] = allRegionWithoutCenter;
// Initialize lampMatrixState from the initial HTML colors
$('.lamp').each(function() {
var row = $(this).data('row');
var col = $(this).data('col');
var color = $(this).css('background-color');
var rgb = color.match(/\d+/g);
lampMatrixState[row][col] = {
ww: rgb[0], cw: rgb[1], blue: rgb[2]
};
});
$('#region-select').on('change', function() {
var region = $(this).val();
// Toggle the inactive state of the control panel based on selection
if (region) {
$('.control-panel').removeClass('inactive-control');
} else {
$('.control-panel').addClass('inactive-control');
}
var newlySelectedLamps = regionMaps[region];
// Clear selected class from all lamps
$('.lamp').removeClass('selected');
// Get the current slider values to use as the new default
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
// Reset all lamps except the center to black in our state
var lampsToUpdate = [];
var centerLampState = lampMatrixState[2][2];
lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0}));
lampMatrixState[2][2] = centerLampState; // Preserve center lamp state
// Set newly selected lamps to the current slider values
selectedLamps = newlySelectedLamps;
selectedLamps.forEach(function(lamp) {
$(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`).addClass('selected');
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
});
if (selectedLamps.length > 0) {
// Update sliders to reflect the state of the first selected lamp
var firstLamp = selectedLamps[0];
var firstLampState = lampMatrixState[firstLamp.row][firstLamp.col];
updateSliders(firstLampState.ww, firstLampState.cw, firstLampState.blue, '');
}
// Send the full matrix state
sendFullMatrixUpdate(lampsToUpdate, true);
});
// Event listener for the region sliders and number inputs
$('.region-slider-group input').on('input', function() {
if (selectedLamps.length === 0) return;
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
// Clamp value
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
var lampsToUpdate = [];
selectedLamps.forEach(function(lamp) {
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
lampsToUpdate.push(lamp);
});
sendFullMatrixUpdate(lampsToUpdate);
});
// Event listener for the center lamp sliders and number inputs
$('.center-slider-group input').on('input', function() {
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
// Clamp value
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#center-ww-slider').val());
var cw = parseInt($('#center-cw-slider').val());
var blue = parseInt($('#center-blue-slider').val());
var centerLamp = {row: 2, col: 2};
lampMatrixState[centerLamp.row][centerLamp.col] = {ww: ww, cw: cw, blue: blue};
sendFullMatrixUpdate([centerLamp]);
});
// Initial check to set the inactive state
if (!$('#region-select').val()) {
$('.control-panel').addClass('inactive-control');
}
});
</script>
</head> </head>
<body> <body>
<div class="container"> <div class="container">
<div id="ble-status"></div>
<h1>Lamp Matrix Control</h1> <h1>Lamp Matrix Control</h1>
<div class="region-control"> <div class="region-control">
<label for="region-select">Select Region:</label> <label for="region-select">Select Region:</label>

View File

@ -0,0 +1,287 @@
import sys
import platform
import os
class VisionSystem:
"""
The main class for the vision system, responsible for pupil segmentation.
It uses a platform-specific backend for the actual implementation.
"""
def __init__(self, config):
self.config = config
self._backend = self._initialize_backend()
def _initialize_backend(self):
"""
Initializes the appropriate backend based on the environment and OS.
"""
# If in a test environment, use the MockBackend
if os.environ.get("PUPILOMETER_ENV") == "test":
print("Initializing Mock backend for testing...")
return MockBackend(self.config)
os_name = platform.system()
if os_name == "Linux" or os_name == "Windows":
# On Jetson (Linux) or Windows, try to use the DeepStream backend
print("Initializing DeepStream backend...")
try:
return DeepStreamBackend(self.config)
except ImportError as e:
print(f"Could not initialize DeepStreamBackend: {e}")
raise e
elif os_name == "Darwin":
# On macOS, use the Python-based backend
print("Initializing Python backend for macOS...")
try:
return PythonBackend(self.config)
except ImportError as e:
print(f"Could not initialize PythonBackend: {e}")
raise e
else:
raise NotImplementedError(f"Unsupported operating system: {os_name}")
def start(self):
"""
Starts the vision system.
"""
self._backend.start()
def stop(self):
"""
Stops the vision system.
"""
self._backend.stop()
def get_pupil_data(self):
"""
Returns the latest pupil segmentation data.
"""
return self._backend.get_pupil_data()
class MockBackend:
"""
A mock backend for testing purposes.
"""
def __init__(self, config):
self.config = config
print("MockBackend initialized.")
def start(self):
print("MockBackend started.")
pass
def stop(self):
print("MockBackend stopped.")
pass
def get_pupil_data(self):
print("Getting pupil data from MockBackend.")
return {
"pupil_position": (123, 456),
"pupil_diameter": 789,
"info": "mock_data"
}
class DeepStreamBackend:
"""
A class to handle pupil segmentation on Jetson/Windows using DeepStream.
"""
def __init__(self, config):
"""
Initializes the DeepStreamBackend.
Args:
config (dict): A dictionary containing configuration parameters.
"""
from deepstream_pipeline import DeepStreamPipeline
self.config = config
self.pipeline = DeepStreamPipeline(config)
print("DeepStreamBackend initialized.")
def start(self):
"""
Starts the DeepStream pipeline.
"""
self.pipeline.start()
print("DeepStreamBackend started.")
def stop(self):
"""
Stops the DeepStream pipeline.
"""
self.pipeline.stop()
print("DeepStreamBackend stopped.")
def get_pupil_data(self):
"""
Retrieves pupil data from the DeepStream pipeline.
"""
return self.pipeline.get_data()
class PythonBackend:
"""
A class to handle pupil segmentation on macOS using pypylon and ONNX Runtime.
"""
def __init__(self, config):
"""
Initializes the PythonBackend.
Args:
config (dict): A dictionary containing configuration parameters
such as 'model_path'.
"""
self.config = config
self.camera = None
self.inference_session = None
print("PythonBackend initialized.")
def start(self):
"""
Initializes the Basler camera and loads the ONNX model.
"""
try:
from pypylon import pylon
except ImportError:
raise ImportError("pypylon is not installed. Cannot start PythonBackend.")
try:
import onnxruntime
except ImportError:
raise ImportError("onnxruntime is not installed. Cannot start PythonBackend.")
try:
# Initialize the camera
self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
self.camera.Open()
# Start grabbing continuously
self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
print("PythonBackend: Basler camera opened and started grabbing.")
except Exception as e:
print(f"PythonBackend: Error opening Basler camera: {e}")
self.camera = None
try:
# Load the ONNX model
self.inference_session = onnxruntime.InferenceSession(self.config['model_path'])
print(f"PythonBackend: ONNX model loaded from {self.config['model_path']}.")
except Exception as e:
print(f"PythonBackend: Error loading ONNX model: {e}")
self.inference_session = None
print("PythonBackend started.")
def stop(self):
"""
Releases the camera resources.
"""
if self.camera and self.camera.IsGrabbing():
self.camera.StopGrabbing()
print("PythonBackend: Basler camera stopped grabbing.")
if self.camera and self.camera.IsOpen():
self.camera.Close()
print("PythonBackend: Basler camera closed.")
print("PythonBackend stopped.")
def _postprocess_output(self, outputs, original_image_shape):
"""
Post-processes the raw output from the YOLOv10 model.
Args:
outputs (list): A list of numpy arrays representing the model's output.
original_image_shape (tuple): The shape of the original image (height, width).
Returns:
dict: A dictionary containing the processed pupil data.
"""
# TODO: Implement the actual post-processing logic.
# This will involve non-maximum suppression (NMS) and parsing the
# bounding boxes and segmentation masks.
print("Post-processing model output...")
pupil_data = {
"raw_model_output_shape": [o.shape for o in outputs],
"pupil_position": (100, 120), # Placeholder
"pupil_diameter": 30, # Placeholder
"bounding_box": [50, 70, 150, 170] # Placeholder [x1, y1, x2, y2]
}
return pupil_data
def get_pupil_data(self):
"""
Grabs a frame from the camera, runs inference, and returns pupil data.
"""
if not self.camera or not self.camera.IsGrabbing():
print("PythonBackend: Camera not ready.")
return None
if not self.inference_session:
print("PythonBackend: Inference session not ready.")
return None
grab_result = None
try:
import cv2
import numpy as np
from pypylon import pylon
grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grab_result.GrabSucceeded():
image = grab_result.Array
original_shape = image.shape
# Image preprocessing
if len(image.shape) == 2:
image = cv2.cvtColor(image, cv2.COLOR_BAYER_BG2RGB)
input_shape = (640, 640)
resized_image = cv2.resize(image, input_shape)
normalized_image = resized_image.astype(np.float32) / 255.0
transposed_image = np.transpose(normalized_image, (2, 0, 1))
input_tensor = np.expand_dims(transposed_image, axis=0)
# Run inference
input_name = self.inference_session.get_inputs()[0].name
output_names = [o.name for o in self.inference_session.get_outputs()]
outputs = self.inference_session.run(output_names, {input_name: input_tensor})
# Post-process the output
pupil_data = self._postprocess_output(outputs, original_shape)
return pupil_data
else:
print(f"PythonBackend: Error grabbing frame: {grab_result.ErrorCode} {grab_result.ErrorDescription}")
return None
except Exception as e:
print(f"PythonBackend: An error occurred during frame grabbing or inference: {e}")
return None
finally:
if grab_result:
grab_result.Release()
if __name__ == '__main__':
# Example usage
config = {"camera_id": 0, "model_path": "yolov10.onnx"}
try:
vision_system = VisionSystem(config)
vision_system.start()
# In a real application, this would run in a loop
pupil_data = vision_system.get_pupil_data()
print(f"Received pupil data: {pupil_data}")
vision_system.stop()
except NotImplementedError as e:
print(e)
except Exception as e:
print(f"An error occurred: {e}")

100
tests/test_e2e.py Normal file
View File

@ -0,0 +1,100 @@
import pytest
import subprocess
import time
import requests
import os
import sys
from playwright.sync_api import Page, expect
# Define the host and port for the application
HOST = "127.0.0.1"
PORT = 5000
BASE_URL = f"http://{HOST}:{PORT}"
STDOUT_FILE = "app_stdout.log"
STDERR_FILE = "app_stderr.log"
@pytest.fixture(scope="module")
def run_app():
"""
Fixture to run the Flask application in a test environment.
"""
# Set the environment variable for the subprocess
env = os.environ.copy()
env["PUPILOMETER_ENV"] = "test"
command = [sys.executable, "-u", "app.py"]
with open(STDOUT_FILE, "w") as stdout_f, open(STDERR_FILE, "w") as stderr_f:
process = subprocess.Popen(
command,
cwd="src/controllerSoftware",
stdout=stdout_f,
stderr=stderr_f,
text=True,
env=env
)
# Wait for the app to start
start_time = time.time()
while True:
if os.path.exists(STDERR_FILE):
with open(STDERR_FILE, "r") as f:
if "* Running on http" in f.read():
break
if time.time() - start_time > 15:
raise TimeoutError("Flask app failed to start in time.")
time.sleep(0.5)
yield process
process.terminate()
process.wait()
if os.path.exists(STDOUT_FILE):
os.remove(STDOUT_FILE)
if os.path.exists(STDERR_FILE):
os.remove(STDERR_FILE)
def test_program_output(run_app):
"""
Tests that the mock backend is initialized.
"""
with open(STDOUT_FILE, "r") as f:
stdout = f.read()
assert "Initializing Mock backend for testing..." in stdout
assert "MockBackend initialized." in stdout
def test_curl_output(run_app):
"""
Tests the API endpoints using requests (similar to curl).
"""
# Test the /ble_status endpoint
response_ble = requests.get(f"{BASE_URL}/ble_status")
assert response_ble.status_code == 200
assert response_ble.json() == {"connected": True} # In DEBUG_MODE
# Test the /vision/pupil_data endpoint
response_vision = requests.get(f"{BASE_URL}/vision/pupil_data")
assert response_vision.status_code == 200
assert "data" in response_vision.json()
assert "success" in response_vision.json()
def test_playwright_checks(page: Page, run_app):
"""
Performs basic and visual checks using Playwright.
"""
page.goto(BASE_URL)
# Basic output check: Title and heading
expect(page).to_have_title("Lamp Matrix Control")
heading = page.locator("h1")
expect(heading).to_have_text("Lamp Matrix Control")
# Visual check: Screenshot
os.makedirs("screenshots", exist_ok=True)
screenshot_path = "screenshots/homepage.png"
page.screenshot(path=screenshot_path)
assert os.path.exists(screenshot_path)

95
tests/test_vision.py Normal file
View File

@ -0,0 +1,95 @@
import unittest
from unittest.mock import patch
import sys
import os
# Add the src/controllerSoftware directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../src/controllerSoftware')))
from vision import VisionSystem, DeepStreamBackend, PythonBackend
class TestVisionSystem(unittest.TestCase):
"""
Unit tests for the VisionSystem class.
"""
def setUp(self):
"""
Set up a VisionSystem instance with a mocked backend for each test.
"""
self.config = {"camera_id": 0, "model_path": "yolov10.onnx"}
@patch('platform.system')
@patch('vision.DeepStreamBackend')
def test_initialization_linux(self, mock_backend, mock_system):
"""
Test that the VisionSystem initializes the DeepStreamBackend on Linux.
"""
mock_system.return_value = 'Linux'
vision_system = VisionSystem(self.config)
mock_backend.assert_called_once_with(self.config)
@patch('platform.system')
@patch('vision.DeepStreamBackend')
def test_initialization_windows(self, mock_backend, mock_system):
"""
Test that the VisionSystem initializes the DeepStreamBackend on Windows.
"""
mock_system.return_value = 'Windows'
vision_system = VisionSystem(self.config)
mock_backend.assert_called_once_with(self.config)
@patch('platform.system')
@patch('vision.PythonBackend')
def test_initialization_macos(self, mock_backend, mock_system):
"""
Test that the VisionSystem initializes the PythonBackend on macOS.
"""
mock_system.return_value = 'Darwin'
vision_system = VisionSystem(self.config)
mock_backend.assert_called_once_with(self.config)
@patch('platform.system')
def test_initialization_unsupported(self, mock_system):
"""
Test that the VisionSystem raises an exception on an unsupported OS.
"""
mock_system.return_value = 'UnsupportedOS'
with self.assertRaises(NotImplementedError):
VisionSystem(self.config)
@patch('platform.system')
@patch('vision.DeepStreamBackend')
def test_start(self, mock_backend, mock_system):
"""
Test that the start method calls the backend's start method.
"""
mock_system.return_value = 'Linux'
vision_system = VisionSystem(self.config)
vision_system.start()
vision_system._backend.start.assert_called_once()
@patch('platform.system')
@patch('vision.DeepStreamBackend')
def test_stop(self, mock_backend, mock_system):
"""
Test that the stop method calls the backend's stop method.
"""
mock_system.return_value = 'Linux'
vision_system = VisionSystem(self.config)
vision_system.stop()
vision_system._backend.stop.assert_called_once()
@patch('platform.system')
@patch('vision.DeepStreamBackend')
def test_get_pupil_data(self, mock_backend, mock_system):
"""
Test that the get_pupil_data method calls the backend's get_pupil_data method.
"""
mock_system.return_value = 'Linux'
vision_system = VisionSystem(self.config)
vision_system.get_pupil_data()
vision_system._backend.get_pupil_data.assert_called_once()
if __name__ == '__main__':
unittest.main()