Compare commits

..

No commits in common. "dev" and "feature/merged-app" have entirely different histories.

28 changed files with 416 additions and 3399 deletions

13
.gitignore vendored
View File

@ -1,13 +0,0 @@
# Virtual Environment
.venv/
init/
# Python cache
__pycache__/
*.pyc
# Test artifacts
app_stdout.log
app_stderr.log
screenshots/
.DS_Store

View File

@ -1,23 +0,0 @@
### Pupil Segmentation Integration
- **Objective:** Integrated Pupil segmentation into the mono camera pipelines.
- **Key Changes:**
- Modified `src/unified_web_ui/gstreamer_pipeline.py` to:
- Add a `tee` element for mono camera streams to split the video feed.
- Create a new branch for pupil segmentation with a `videoconvert` placeholder and a dedicated `appsink` (`seg_sink_{i}`).
- Implement `on_new_seg_sample_factory` callback to handle segmentation data.
- Added `seg_frame_buffers` and `seg_buffer_locks` for segmentation output.
- Introduced `get_seg_frame_by_id` to retrieve segmentation frames.
- Ensured unique naming for `tee` elements (`t_{i}`) in the GStreamer pipeline to prevent linking errors.
- Modified `src/unified_web_ui/app.py` to:
- Add a new Flask route `/segmentation_feed/<int:stream_id>` to serve the segmentation video stream.
- Added `datetime.utcnow` to the Jinja2 context for cache-busting in templates.
- Modified `src/unified_web_web_ui/templates/index.html` to:
- Include a new "Segmentation Feed" section displaying the segmentation video streams, sourcing from `/segmentation_feed/` with cache-busting timestamps.
- Updated existing video feeds (`video_feed`) with cache-busting timestamps for consistency.
- **Testing:**
- Created `tests/test_segmentation.py` to verify the segmentation feed is visible and updating.
- Updated `src/unified_web_ui/tests/test_ui.py` to refine locators (`#camera .camera-streams-grid .camera-container-individual`) for camera stream elements, resolving conflicts with segmentation feeds.
- Updated `src/unified_web_ui/tests/test_visual.py` to refine locators (`#camera .camera-mono-row`, `#camera .camera-color-row`, `#camera .camera-mono`) to prevent strict mode violations and ensure accurate targeting of camera layout elements.
- Fixed indentation errors in `src/unified_web_ui/tests/test_visual.py`.
- **Status:** All tests are passing, and the infrastructure for pupil segmentation is in place, awaiting the integration of a DeepStream model.

View File

@ -3,26 +3,3 @@
## Introduction ## Introduction
This repository houses programs and documents related to Pupilometer project by Vietnam Academy of Science and Technology. The project aims to introduce a benchmark and researches into the interaction between light intensity and temperature to the eye strain disorder. This repository houses programs and documents related to Pupilometer project by Vietnam Academy of Science and Technology. The project aims to introduce a benchmark and researches into the interaction between light intensity and temperature to the eye strain disorder.
## Dependencies
### Python Dependencies
The Python dependencies are listed in the `requirements.txt` file. You can install them using pip:
```bash
pip install -r requirements.txt
```
### NVIDIA DeepStream
For running the pupil segmentation on a Jetson Orin AGX or a Windows machine with an NVIDIA GPU, this project uses NVIDIA DeepStream. DeepStream is a complex dependency and cannot be installed via pip.
Please follow the official NVIDIA documentation to install DeepStream for your platform:
* **Jetson:** [DeepStream for Jetson](https://developer.nvidia.com/deepstream-sdk-jetson)
* **Windows:** [DeepStream for Windows](https://developer.nvidia.com/deepstream-sdk-windows)
You will also need to install GStreamer and the Python bindings (PyGObject). These are usually installed as part of the DeepStream installation.
Additionally, the `pyds` library, which provides Python bindings for DeepStream metadata structures, is required. This library is also included with the DeepStream SDK and may need to be installed manually.

View File

@ -1,146 +1,2 @@
appdirs==1.4.4 bleak>="1.0.0"
apturl==0.5.2 flask>="3.1.1"
async-timeout==5.0.1
attrs==21.2.0
bcrypt==3.2.0
beniget==0.4.1
bleak==2.0.0
blinker==1.9.0
Brlapi==0.8.3
Brotli==1.0.9
certifi==2020.6.20
chardet==4.0.0
charset-normalizer==3.4.4
click==8.3.1
colorama==0.4.4
coloredlogs==15.0.1
contourpy==1.3.2
cpuset==1.6
cryptography==3.4.8
cupshelpers==1.0
cycler==0.11.0
dbus-fast==3.1.2
dbus-python==1.2.18
decorator==4.4.2
defer==1.0.6
distro==1.7.0
distro-info==1.1+ubuntu0.2
duplicity==0.8.21
exceptiongroup==1.3.1
fasteners==0.14.1
filelock==3.20.0
Flask==3.1.2
flatbuffers==25.9.23
fonttools==4.29.1
fs==2.4.12
fsspec==2025.10.0
future==0.18.2
gast==0.5.2
greenlet==3.2.4
httplib2==0.20.2
humanfriendly==10.0
idna==3.3
importlib-metadata==4.6.4
iniconfig==2.3.0
itsdangerous==2.2.0
jeepney==0.7.1
Jetson.GPIO==2.1.7
Jinja2==3.1.6
keyring==23.5.0
kiwisolver==1.3.2
language-selector==0.1
launchpadlib==1.10.16
lazr.restfulclient==0.14.4
lazr.uri==1.0.6
lockfile==0.12.2
louis==3.20.0
lxml==4.8.0
lz4==3.1.3+dfsg
macaroonbakery==1.3.1
Mako==1.1.3
MarkupSafe==3.0.3
matplotlib==3.5.1
meson==1.9.1
ml_dtypes==0.5.4
monotonic==1.6
more-itertools==8.10.0
mpmath==1.3.0
networkx==3.4.2
ninja==1.13.0
numpy==2.2.6
oauthlib==3.2.0
olefile==0.46
onboard==1.4.1
onnx==1.20.0
onnxruntime==1.23.2
onnxslim==0.1.77
opencv-python==4.12.0.88
packaging==25.0
pandas==1.3.5
paramiko==2.9.3
pexpect==4.8.0
Pillow==9.0.1
playwright==1.56.0
pluggy==1.6.0
ply==3.11
polars==1.35.2
polars-runtime-32==1.35.2
protobuf==6.33.1
psutil==7.1.3
ptyprocess==0.7.0
pycairo==1.20.1
pycups==2.0.1
pyee==13.0.0
Pygments==2.19.2
PyGObject==3.42.1
PyJWT==2.3.0
pymacaroons==0.13.0
PyNaCl==1.5.0
PyOpenGL==3.1.5
pyparsing==2.4.7
pypylon==4.2.0
pyRFC3339==1.1
pyservicemaker @ file:///opt/nvidia/deepstream/deepstream-7.1/service-maker/python/pyservicemaker-0.0.1-py3-none-linux_aarch64.whl
pytest==9.0.1
pytest-base-url==2.1.0
pytest-playwright==0.7.2
python-apt==2.4.0+ubuntu4
python-dateutil==2.8.1
python-dbusmock==0.27.5
python-debian==0.1.43+ubuntu1.1
python-slugify==8.0.4
pythran==0.10.0
pytz==2022.1
pyxdg==0.27
PyYAML==6.0.3
requests==2.25.1
scipy==1.8.0
seaborn==0.13.2
SecretStorage==3.3.1
six==1.16.0
SQLAlchemy==2.0.44
sympy==1.14.0
systemd-python==234
text-unidecode==1.3
thop==0.1.1.post2209072238
tomli==2.3.0
torch==2.9.1
torchaudio==2.9.1
torchvision==0.24.1
tqdm==4.67.1
typing_extensions==4.15.0
ubuntu-advantage-tools==8001
ubuntu-drivers-common==0.0.0
ufoLib2==0.13.1
ultralytics==8.3.233
ultralytics-thop==2.0.18
unicodedata2==14.0.0
urllib3==1.26.5
urwid==2.1.2
uv==0.9.13
wadllib==1.3.6
websockets==15.0.1
Werkzeug==3.1.4
xdg==5
xkit==0.0.0
zipp==1.0.0

View File

@ -1,8 +0,0 @@
# Activate the virtual environment
. .\.venv\Scripts\Activate.ps1
# Install dependencies
pip install -r requirements.txt
# Run the Flask application
python src/controllerSoftware/app.py

4
run.sh
View File

@ -1,4 +0,0 @@
#!/bin/bash
source .venv/bin/activate
pip install -r requirements.txt
python src/controllerSoftware/app.py

View File

@ -1,59 +0,0 @@
#!/bin/bash
# Start the Flask application in the background
python src/unified_web_ui/app.py &
APP_PID=$!
# Wait for the application to start
echo "Waiting for application to start..."
sleep 10
# Check if the application is running
if ! ps -p $APP_PID > /dev/null
then
echo "Application failed to start."
exit 1
fi
# Run the curl tests
echo "Running curl tests..."
http_code=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:5000/)
echo "Main page status code: $http_code"
if [ "$http_code" != "200" ]; then
echo "Main page test failed."
fi
http_code=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:5000/get_fps)
echo "get_fps status code: $http_code"
if [ "$http_code" != "200" ]; then
echo "get_fps test failed."
fi
matrix_data='{"matrix":['
for i in {1..5}; do
matrix_data+='['
for j in {1..5}; do
matrix_data+='{"ww":0,"cw":0,"blue":0}'
if [ $j -lt 5 ]; then
matrix_data+=','
fi
done
matrix_data+=']'
if [ $i -lt 5 ]; then
matrix_data+=','
fi
done
matrix_data+=']}'
http_code=$(curl -s -o /dev/null -w "%{http_code}" -X POST -H "Content-Type: application/json" -d "$matrix_data" http://localhost:5000/set_matrix)
echo "set_matrix status code: $http_code"
if [ "$http_code" != "200" ]; then
echo "set_matrix test failed."
fi
# Run the pytest tests
echo "Running pytest tests..."
pytest src/unified_web_ui/tests/
# Kill the Flask application
kill $APP_PID

View File

@ -14,7 +14,7 @@ import os
# Set to True to run without a physical BLE device for testing purposes. # Set to True to run without a physical BLE device for testing purposes.
# Set to False to connect to the actual lamp matrix. # Set to False to connect to the actual lamp matrix.
DEBUG_MODE = False DEBUG_MODE = True
# --- BLE Device Configuration (Ignored in DEBUG_MODE) --- # --- BLE Device Configuration (Ignored in DEBUG_MODE) ---
DEVICE_NAME = "Pupilometer LED Billboard" DEVICE_NAME = "Pupilometer LED Billboard"
@ -84,17 +84,17 @@ async def set_full_matrix_on_ble(colorSeries):
# ===================================================================== # =====================================================================
# SNIPPET TO PATCH SWAPPED LAMP POSITIONS # SNIPPET TO PATCH SWAPPED LAMP POSITIONS
# ===================================================================== # =====================================================================
#print("Patching lamp positions 3 <-> 7 and 12 <-> 24.") print("Patching lamp positions 3 <-> 7 and 12 <-> 24.")
# Swap data for lamps at positions 3 and 7 # Swap data for lamps at positions 3 and 7
#temp_color_3 = colorSeries[3] temp_color_3 = colorSeries[3]
#colorSeries[3] = colorSeries[7] colorSeries[3] = colorSeries[7]
#colorSeries[7] = temp_color_3 colorSeries[7] = temp_color_3
# Swap data for lamps at positions 12 and 24 # Swap data for lamps at positions 12 and 24
#temp_color_12 = colorSeries[12] temp_color_12 = colorSeries[12]
#colorSeries[12] = colorSeries[24] colorSeries[12] = colorSeries[24]
#colorSeries[24] = temp_color_12 colorSeries[24] = temp_color_12
# ===================================================================== # =====================================================================
if DEBUG_MODE: if DEBUG_MODE:

View File

@ -1,267 +0,0 @@
import sys
import subprocess
import threading
import time
import gc
import json
from flask import Flask, Response, render_template_string, jsonify
# --- CONFIGURATION ---
TARGET_NUM_CAMS = 3
DEFAULT_W = 1280
DEFAULT_H = 720
# --- PART 1: DETECTION ---
def scan_connected_cameras():
print("--- Scanning for Basler Cameras ---")
detection_script = """
import sys
try:
from pypylon import pylon
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
print("NONE")
else:
serials = [d.GetSerialNumber() for d in devices]
cam = pylon.InstantCamera(tl_factory.CreateDevice(devices[0]))
cam.Open()
try:
cam.BinningHorizontal.Value = 2
cam.BinningVertical.Value = 2
w = cam.Width.GetValue()
h = cam.Height.GetValue()
cam.BinningHorizontal.Value = 1
cam.BinningVertical.Value = 1
supported = 1
except:
w = cam.Width.GetValue()
h = cam.Height.GetValue()
supported = 0
cam.Close()
print(f"{','.join(serials)}|{w}|{h}|{supported}")
except Exception:
print("NONE")
"""
try:
result = subprocess.run([sys.executable, "-c", detection_script], capture_output=True, text=True)
output = result.stdout.strip()
if "NONE" in output or not output:
return [], DEFAULT_W, DEFAULT_H, False
parts = output.split('|')
return parts[0].split(','), int(parts[1]), int(parts[2]), (parts[3] == '1')
except: return [], DEFAULT_W, DEFAULT_H, False
DETECTED_SERIALS, CAM_W, CAM_H, BINNING_SUPPORTED = scan_connected_cameras()
ACTUAL_CAMS_COUNT = len(DETECTED_SERIALS)
# --- RESOLUTION & LAYOUT ---
INTERNAL_WIDTH = 1280
if ACTUAL_CAMS_COUNT > 0:
scale = INTERNAL_WIDTH / CAM_W
INTERNAL_HEIGHT = int(CAM_H * scale)
else:
INTERNAL_HEIGHT = 720
if INTERNAL_HEIGHT % 2 != 0: INTERNAL_HEIGHT += 1
WEB_WIDTH = 1280
total_source_width = INTERNAL_WIDTH * TARGET_NUM_CAMS
scale_tiled = WEB_WIDTH / total_source_width
WEB_HEIGHT = int(INTERNAL_HEIGHT * scale_tiled)
if WEB_HEIGHT % 2 != 0: WEB_HEIGHT += 1
print(f"LAYOUT: {TARGET_NUM_CAMS} Slots | Detected: {ACTUAL_CAMS_COUNT} Cams")
# --- FLASK & GSTREAMER ---
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
app = Flask(__name__)
frame_buffer = None
buffer_lock = threading.Lock()
current_fps = 0.0
frame_count = 0
start_time = time.time()
class GStreamerPipeline(threading.Thread):
def __init__(self):
super().__init__()
self.loop = GLib.MainLoop()
self.pipeline = None
def run(self):
Gst.init(None)
self.build_pipeline()
self.pipeline.set_state(Gst.State.PLAYING)
try:
self.loop.run()
except Exception as e:
print(f"Error: {e}")
finally:
self.pipeline.set_state(Gst.State.NULL)
def on_new_sample(self, sink):
global frame_count, start_time, current_fps
sample = sink.emit("pull-sample")
if not sample: return Gst.FlowReturn.ERROR
frame_count += 1
# Calculate FPS every 30 frames
if frame_count % 30 == 0:
elapsed = time.time() - start_time
current_fps = 30 / elapsed if elapsed > 0 else 0
start_time = time.time()
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success: return Gst.FlowReturn.ERROR
global frame_buffer
with buffer_lock:
frame_buffer = bytes(map_info.data)
buffer.unmap(map_info)
return Gst.FlowReturn.OK
def build_pipeline(self):
# 1. CAMERA SETTINGS
# Note: We run cameras at 60 FPS for internal stability
cam_settings = (
"cam::TriggerMode=Off "
"cam::AcquisitionFrameRateEnable=true cam::AcquisitionFrameRate=60.0 "
"cam::ExposureAuto=Off "
"cam::ExposureTime=20000.0 "
"cam::GainAuto=Continuous "
"cam::DeviceLinkThroughputLimitMode=Off "
)
if BINNING_SUPPORTED:
cam_settings += "cam::BinningHorizontal=2 cam::BinningVertical=2 "
sources_str = ""
for i in range(TARGET_NUM_CAMS):
if i < len(DETECTED_SERIALS):
# --- REAL CAMERA SOURCE ---
serial = DETECTED_SERIALS[i]
print(f"Slot {i}: Linking Camera {serial}")
pre_scale = (
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12, width={INTERNAL_WIDTH}, height={INTERNAL_HEIGHT}, framerate=60/1 ! "
)
source = (
f"pylonsrc device-serial-number={serial} {cam_settings} ! "
"video/x-raw,format=GRAY8 ! "
"videoconvert ! "
"video/x-raw,format=I420 ! "
"nvvideoconvert compute-hw=1 ! "
"video/x-raw(memory:NVMM) ! "
f"{pre_scale}"
f"m.sink_{i} "
)
else:
# --- DISCONNECTED PLACEHOLDER ---
print(f"Slot {i}: Creating Placeholder (Synchronized)")
# FIX 1: Add 'videorate' to enforce strict timing on the fake source
# This prevents the placeholder from running too fast/slow and jittering the muxer
source = (
f"videotestsrc pattern=black is-live=true ! "
f"videorate ! " # <--- TIMING ENFORCER
f"video/x-raw,width={INTERNAL_WIDTH},height={INTERNAL_HEIGHT},format=I420,framerate=60/1 ! "
f"textoverlay text=\"DISCONNECTED\" valignment=center halignment=center font-desc=\"Sans, 48\" ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM),format=NV12,width={INTERNAL_WIDTH},height={INTERNAL_HEIGHT},framerate=60/1 ! "
f"m.sink_{i} "
)
sources_str += source
# 3. MUXER & PROCESSING
# FIX 2: batched-push-timeout=33000
# This tells the muxer: "If you have data, send it every 33ms (30fps). Don't wait forever."
# FIX 3: Output Videorate
# We process internally at 60fps (best for camera driver), but we DROP to 30fps
# for the web stream. This makes the network stream buttery smooth and consistent.
processing = (
f"nvstreammux name=m batch-size={TARGET_NUM_CAMS} width={INTERNAL_WIDTH} height={INTERNAL_HEIGHT} "
f"live-source=1 batched-push-timeout=33000 ! " # <--- TIMEOUT FIX
f"nvmultistreamtiler width={WEB_WIDTH} height={WEB_HEIGHT} rows=1 columns={TARGET_NUM_CAMS} ! "
"nvvideoconvert compute-hw=1 ! "
"video/x-raw(memory:NVMM) ! "
"videorate drop-only=true ! " # <--- DROPPING FRAMES CLEANLY
"video/x-raw(memory:NVMM), framerate=30/1 ! " # <--- Force 30 FPS Output
f"nvjpegenc quality=60 ! "
"appsink name=sink emit-signals=True sync=False max-buffers=1 drop=True"
)
pipeline_str = f"{sources_str} {processing}"
print(f"Launching SMOOTH Pipeline...")
self.pipeline = Gst.parse_launch(pipeline_str)
appsink = self.pipeline.get_by_name("sink")
appsink.connect("new-sample", self.on_new_sample)
# --- FLASK ---
@app.route('/')
def index():
return render_template_string('''
<html>
<head>
<style>
body { background-color: #111; color: white; text-align: center; font-family: monospace; margin: 0; padding: 20px; }
.container { position: relative; display: inline-block; border: 3px solid #4CAF50; }
img { display: block; max-width: 100%; height: auto; }
.hud {
position: absolute; top: 10px; left: 10px;
background: rgba(0, 0, 0, 0.6); color: #00FF00;
padding: 5px 10px; font-weight: bold; pointer-events: none;
}
</style>
</head>
<body>
<h1>Basler 3-Cam (Smooth)</h1>
<div class="container">
<div class="hud" id="fps-counter">FPS: --</div>
<img src="{{ url_for('video_feed') }}">
</div>
<script>
setInterval(function() {
fetch('/get_fps').then(r => r.json()).then(d => {
document.getElementById('fps-counter').innerText = "FPS: " + d.fps;
});
}, 500);
</script>
</body>
</html>
''')
@app.route('/video_feed')
def video_feed():
def generate():
count = 0
while True:
with buffer_lock:
if frame_buffer:
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame_buffer + b'\r\n')
# Sleep 33ms (30 FPS)
time.sleep(0.033)
count += 1
if count % 200 == 0: gc.collect()
return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/get_fps')
def get_fps():
return jsonify(fps=round(current_fps, 1))
if __name__ == "__main__":
subprocess.run([sys.executable, "-c", "import gc; gc.collect()"])
gst_thread = GStreamerPipeline()
gst_thread.daemon = True
gst_thread.start()
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)

View File

@ -1,58 +0,0 @@
from pypylon import pylon
import time
import sys
try:
# Get the Transport Layer Factory
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
print("No cameras found!")
sys.exit(1)
print(f"Found {len(devices)} cameras. Checking Camera 1...")
# Connect to first camera
cam = pylon.InstantCamera(tl_factory.CreateDevice(devices[0]))
cam.Open()
# 1. Reset to Defaults
print("Reseting to Defaults...")
cam.UserSetSelector.Value = "Default"
cam.UserSetLoad.Execute()
# 2. Enable Auto Exposure/Gain
print("Enabling Auto Exposure & Gain...")
cam.ExposureAuto.Value = "Continuous"
cam.GainAuto.Value = "Continuous"
# 3. Wait for it to settle (Camera adjusts to light)
print("Waiting 3 seconds for auto-adjustment...")
for i in range(3):
print(f"{3-i}...")
time.sleep(1)
# 4. READ VALUES
current_exposure = cam.ExposureTime.GetValue() # In Microseconds (us)
current_fps_readout = cam.ResultingFrameRate.GetValue()
print("-" * 30)
print(f"REPORT FOR SERIAL: {cam.GetDeviceInfo().GetSerialNumber()}")
print("-" * 30)
print(f"Current Exposure Time: {current_exposure:.1f} us ({current_exposure/1000:.1f} ms)")
print(f"Theoretical Max FPS: {1000000 / current_exposure:.1f} FPS")
print(f"Camera Internal FPS: {current_fps_readout:.1f} FPS")
print("-" * 30)
if current_exposure > 33000:
print("⚠️ PROBLEM FOUND: Exposure is > 33ms.")
print(" This physically prevents the camera from reaching 30 FPS.")
print(" Solution: Add more light or limit AutoExposureUpperLimit.")
else:
print("✅ Exposure looks fast enough for 30 FPS.")
cam.Close()
except Exception as e:
print(f"Error: {e}")

View File

@ -1,33 +0,0 @@
# Unified WebUI
This application combines the functionality of the `detectionSoftware` and `controllerSoftware` into a single, unified web interface.
## Features
- **Camera View:** Displays a tiled video stream from multiple Basler cameras.
- **Lamp Control:** Provides a web interface to control a 5x5 LED matrix via Bluetooth Low Energy (BLE).
- **Responsive UI:** The UI is designed to work on both desktop and mobile devices. On desktop, the lamp control and camera view are displayed side-by-side. On mobile, they are in separate tabs.
## Setup
1. **Install dependencies:**
```bash
pip install -r requirements.txt
```
2. **Run the application:**
```bash
python src/unified_web_ui/app.py
```
3. **Open the web interface:**
Open a web browser and navigate to `http://<your-ip-address>:5000`.
## Modules
- **`app.py`:** The main Flask application file.
- **`ble_controller.py`:** Handles the BLE communication with the lamp matrix.
- **`camera_scanner.py`:** Scans for connected Basler cameras.
- **`gstreamer_pipeline.py`:** Creates and manages the GStreamer pipeline for video processing.
- **`templates/index.html`:** The main HTML template for the web interface.
- **`static/style.css`:** The CSS file for styling the web interface.

View File

@ -1,226 +0,0 @@
import sys
import subprocess
import threading
import time
import asyncio
import json
import signal
import os
from flask import Flask, Response, render_template, request, jsonify, g
from camera_scanner import scan_connected_cameras
from gstreamer_pipeline import GStreamerPipeline
from ble_controller import BLEController, get_spiral_address, SPIRAL_MAP_5x5, lampAmount
# =================================================================================================
# APP CONFIGURATION
# =================================================================================================
# --- Camera Configuration ---
TARGET_NUM_CAMS = 3
DEFAULT_W = 1280
DEFAULT_H = 720
# --- BLE Device Configuration ---
DEVICE_NAME = "Pupilometer LED Billboard"
DEBUG_MODE = False # Set to True to run without a physical BLE device
# =================================================================================================
# INITIALIZATION
# =================================================================================================
# --- Camera Initialization ---
DETECTED_CAMS = scan_connected_cameras()
ACTUAL_CAMS_COUNT = len(DETECTED_CAMS)
# Sort cameras: color camera first, then mono cameras
# Assuming 'is_color' is a reliable flag
# If no color camera exists, the first mono will be at index 0.
detected_cams_sorted = sorted(DETECTED_CAMS, key=lambda x: x['is_color'], reverse=True)
if ACTUAL_CAMS_COUNT > 0:
MASTER_W = detected_cams_sorted[0]['width']
MASTER_H = detected_cams_sorted[0]['height']
else:
MASTER_W = DEFAULT_W
MASTER_H = DEFAULT_H
INTERNAL_WIDTH = 1280
scale = INTERNAL_WIDTH / MASTER_W
INTERNAL_HEIGHT = int(MASTER_H * scale)
if INTERNAL_HEIGHT % 2 != 0: INTERNAL_HEIGHT += 1
WEB_WIDTH = 1280
total_source_width = INTERNAL_WIDTH * TARGET_NUM_CAMS
scale_tiled = WEB_WIDTH / total_source_width
WEB_HEIGHT = int(INTERNAL_HEIGHT * scale_tiled)
if INTERNAL_HEIGHT % 2 != 0: INTERNAL_HEIGHT += 1 # Ensure even for some GStreamer elements
print(f"LAYOUT: {TARGET_NUM_CAMS} Slots | Detected: {ACTUAL_CAMS_COUNT}")
for c in detected_cams_sorted:
print(f" - Cam {c['serial']} ({c['model']}): {'COLOR' if c['is_color'] else 'MONO'}")
# --- Flask App Initialization ---
app = Flask(__name__)
# --- GStreamer Initialization ---
gst_thread = GStreamerPipeline(detected_cams_sorted, TARGET_NUM_CAMS, INTERNAL_WIDTH, INTERNAL_HEIGHT, WEB_WIDTH, WEB_HEIGHT)
gst_thread.daemon = True
gst_thread.start()
# --- BLE Initialization ---
ble_controller = BLEController(DEVICE_NAME, DEBUG_MODE)
ble_thread = None
if not DEBUG_MODE:
ble_controller.ble_event_loop = asyncio.new_event_loop()
ble_thread = threading.Thread(target=ble_controller.ble_event_loop.run_forever, daemon=True)
ble_thread.start()
future = asyncio.run_coroutine_threadsafe(ble_controller.connect(), ble_controller.ble_event_loop)
try:
future.result(timeout=10)
except Exception as e:
print(f"Failed to connect to BLE device: {e}")
# Optionally, set DEBUG_MODE to True here if BLE connection is critical
# DEBUG_MODE = True
# --- In-memory matrix for DEBUG_MODE ---
lamp_matrix = [['#000000' for _ in range(5)] for _ in range(5)]
# =================================================================================================
# COLOR MIXING
# =================================================================================================
def calculate_rgb(ww, cw, blue):
warm_white_r, warm_white_g, warm_white_b = 255, 192, 128
cool_white_r, cool_white_g, cool_white_b = 192, 224, 255
blue_r, blue_g, blue_b = 0, 0, 255
r = (ww / 255) * warm_white_r + (cw / 255) * cool_white_r + (blue / 255) * blue_r
g = (ww / 255) * warm_white_g + (cw / 255) * cool_white_g + (blue / 255) * blue_g
b = (ww / 255) * warm_white_b + (cw / 255) * cool_white_b + (blue / 255) * blue_b
r = int(min(255, round(r)))
g = int(min(255, round(g)))
b = int(min(255, round(b)))
return r, g, b
def rgb_to_hex(r, g, b):
r = int(max(0, min(255, r)))
g = int(max(0, min(255, g)))
b = int(max(0, min(255, b)))
return f'#{r:02x}{g:02x}{b:02x}'
# =================================================================================================
# FLASK ROUTES
# =================================================================================================
from datetime import datetime
@app.context_processor
def inject_now():
return {'now': datetime.utcnow}
@app.before_request
def before_request():
g.detected_cams_info = []
for cam in gst_thread.sorted_cams:
cam_copy = cam.copy()
if cam_copy['height'] > 0:
cam_copy['aspect_ratio'] = cam_copy['width'] / cam_copy['height']
else:
cam_copy['aspect_ratio'] = 16 / 9 # Default aspect ratio
g.detected_cams_info.append(cam_copy)
@app.route('/')
def index():
return render_template('index.html', matrix=lamp_matrix, detected_cams_info=g.detected_cams_info)
@app.route('/video_feed/<int:stream_id>')
def video_feed(stream_id):
def generate(stream_id):
while True:
frame = gst_thread.get_frame_by_id(stream_id)
if frame:
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
time.sleep(0.016) # Roughly 60 fps
return Response(generate(stream_id), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/segmentation_feed/<int:stream_id>')
def segmentation_feed(stream_id):
def generate(stream_id):
while True:
frame = gst_thread.get_seg_frame_by_id(stream_id)
if frame:
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
time.sleep(0.016) # Roughly 60 fps
return Response(generate(stream_id), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/get_fps')
def get_fps():
return jsonify(fps=gst_thread.get_fps())
@app.route('/set_matrix', methods=['POST'])
def set_matrix():
data = request.get_json()
full_matrix = data.get('matrix', [])
if not full_matrix or len(full_matrix) != 5 or len(full_matrix[0]) != 5:
return jsonify(success=False, message="Invalid matrix data received"), 400
serial_colors = [b'\x00\x00\x00'] * lampAmount
try:
for row in range(5):
for col in range(5):
lamp_data = full_matrix[row][col]
ww = int(lamp_data['ww'])
cw = int(lamp_data['cw'])
blue = int(lamp_data['blue'])
color_bytes = bytes([ww, cw, blue])
spiral_pos = get_spiral_address(row, col, SPIRAL_MAP_5x5)
if spiral_pos != -1:
serial_colors[spiral_pos] = color_bytes
lampColorR, lampColorG, lampColorB = calculate_rgb(ww,cw,blue)
lamp_matrix[row][col] = rgb_to_hex(lampColorR, lampColorG, lampColorB)
if DEBUG_MODE:
return jsonify(success=True)
else:
asyncio.run_coroutine_threadsafe(
ble_controller.set_full_matrix(serial_colors),
ble_controller.ble_event_loop
)
return jsonify(success=True)
except Exception as e:
print(f"Error in set_matrix route: {e}")
return jsonify(success=False, message=str(e)), 500
# =================================================================================================
# APP SHUTDOWN
# =================================================================================================
def signal_handler(signum, frame):
print("Received shutdown signal, gracefully shutting down...")
if not DEBUG_MODE:
disconnect_future = asyncio.run_coroutine_threadsafe(ble_controller.disconnect(), ble_controller.ble_event_loop)
try:
disconnect_future.result(timeout=5)
except Exception as e:
print(f"Error during BLE disconnect: {e}")
if not DEBUG_MODE and ble_controller.ble_event_loop and ble_controller.ble_event_loop.is_running():
ble_controller.ble_event_loop.call_soon_threadsafe(ble_controller.ble_event_loop.stop)
ble_thread.join(timeout=1)
os._exit(0)
# =================================================================================================
# APP STARTUP
# =================================================================================================
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True, use_reloader=False)

View File

@ -1,108 +0,0 @@
import asyncio
from bleak import BleakScanner, BleakClient
# =================================================================================================
# BLE HELPER FUNCTIONS (Used in LIVE mode)
# =================================================================================================
lampAmount = 25
def create_spiral_map(n=5):
if n % 2 == 0:
raise ValueError("Matrix size must be odd for a unique center point.")
spiral_map = [[0] * n for _ in range(n)]
r, c = n // 2, n // 2
address = 0
spiral_map[r][c] = address
dr = [-1, 0, 1, 0]
dc = [0, 1, 0, -1]
direction = 0
segment_length = 1
steps = 0
while address < n * n - 1:
for _ in range(segment_length):
address += 1
r += dr[direction]
c += dc[direction]
if 0 <= r < n and 0 <= c < n:
spiral_map[r][c] = address
direction = (direction + 1) % 4
steps += 1
if steps % 2 == 0:
segment_length += 1
return spiral_map
def get_spiral_address(row, col, spiral_map):
n = len(spiral_map)
if 0 <= row < n and 0 <= col < n:
return spiral_map[row][col]
else:
return -1
SPIRAL_MAP_5x5 = create_spiral_map(5)
class BLEController:
def __init__(self, device_name, debug_mode=False):
self.device_name = device_name
self.debug_mode = debug_mode
self.ble_client = None
self.ble_characteristics = None
self.ble_event_loop = None
async def connect(self):
print(f"Scanning for device: {self.device_name}...")
devices = await BleakScanner.discover()
target_device = next((d for d in devices if d.name == self.device_name), None)
if not target_device:
print(f"Device '{self.device_name}' not found.")
return False
print(f"Found device: {target_device.name} ({target_device.address})")
try:
self.ble_client = BleakClient(target_device.address)
await self.ble_client.connect()
if self.ble_client.is_connected:
print(f"Connected to {target_device.name}")
services = [service for service in self.ble_client.services if service.handle != 1]
characteristics = [
char for service in services for char in service.characteristics
]
self.ble_characteristics = sorted(characteristics, key=lambda char: char.handle)
print(f"Found {len(self.ble_characteristics)} characteristics for lamps.")
return True
else:
print(f"Failed to connect to {target_device.name}")
return False
except Exception as e:
print(f"An error occurred during BLE connection: {e}")
return False
async def disconnect(self):
if self.ble_client and self.ble_client.is_connected:
await self.ble_client.disconnect()
print("BLE client disconnected.")
async def set_full_matrix(self, color_series):
if not self.ble_client or not self.ble_client.is_connected:
print("BLE client not connected. Attempting to reconnect...")
await self.connect()
if not self.ble_client or not self.ble_client.is_connected:
print("Failed to reconnect to BLE client.")
return
if self.debug_mode:
print(f"Constructed the following matrix data: {color_series}")
for i, char in enumerate(self.ble_characteristics):
value_to_write = color_series[i]
print(f"Setting Lamp {i} ({char.uuid}) to {value_to_write.hex()}")
await self.ble_client.write_gatt_char(char.uuid, value_to_write)
else:
value_to_write = b"".join([color for color in color_series])
print(f"Setting lamps to {value_to_write.hex()}")
await self.ble_client.write_gatt_char(self.ble_characteristics[0].uuid, value_to_write)

View File

@ -1,51 +0,0 @@
import sys
import subprocess
def scan_connected_cameras():
print("--- Scanning for Basler Cameras ---")
detection_script = """
import sys
try:
from pypylon import pylon
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
print("NONE")
else:
results = []
for i in range(len(devices)):
cam = pylon.InstantCamera(tl_factory.CreateDevice(devices[i]))
cam.Open()
serial = cam.GetDeviceInfo().GetSerialNumber()
model = cam.GetDeviceInfo().GetModelName()
is_color = model.endswith("c") or "Color" in model
w = cam.Width.GetValue()
h = cam.Height.GetValue()
binning = 0
try:
cam.BinningHorizontal.Value = 2
cam.BinningVertical.Value = 2
cam.BinningHorizontal.Value = 1
cam.BinningVertical.Value = 1
binning = 1
except: pass
current_fmt = cam.PixelFormat.GetValue()
cam.Close()
results.append(f"{serial}:{w}:{h}:{binning}:{1 if is_color else 0}:{model}:{current_fmt}")
print("|".join(results))
except Exception: print("NONE")
"""
try:
result = subprocess.run([sys.executable, "-c", detection_script], capture_output=True, text=True)
output = result.stdout.strip()
if "NONE" in output or not output: return []
camera_list = []
entries = output.split('|')
for entry in entries:
parts = entry.split(':')
camera_list.append({
"serial": parts[0], "width": int(parts[1]), "height": int(parts[2]),
"binning": (parts[3] == '1'), "is_color": (parts[4] == '1'), "model": parts[5]
})
return camera_list
except: return []

View File

@ -1,195 +0,0 @@
import threading
import time
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib, GObject
class GStreamerPipeline(threading.Thread):
def __init__(self, detected_cams, target_num_cams, internal_width, internal_height, web_width, web_height):
super().__init__()
self.loop = GLib.MainLoop()
self.pipeline = None
self.target_num_cams = target_num_cams
self.internal_width = internal_width
self.internal_height = internal_height
self.web_width = web_width
self.web_height = web_height
self.frame_buffers = [None] * self.target_num_cams
self.buffer_locks = [threading.Lock() for _ in range(self.target_num_cams)]
self.seg_frame_buffers = [None] * self.target_num_cams
self.seg_buffer_locks = [threading.Lock() for _ in range(self.target_num_cams)]
self.current_fps = 0.0 # Will still report overall FPS, not per stream
self.frame_count = 0
self.start_time = time.time()
# Sort cameras: color camera first, then mono cameras
self.sorted_cams = detected_cams # We now expect detected_cams to be already sorted in app.py or be handled by the client
print(f"Sorted cameras for GStreamer: {self.sorted_cams}")
def run(self):
Gst.init(None)
self.build_pipeline()
if self.pipeline:
self.pipeline.set_state(Gst.State.PLAYING)
try:
self.loop.run()
except Exception as e:
print(f"Error: {e}")
finally:
self.pipeline.set_state(Gst.State.NULL)
else:
print("GStreamer pipeline failed to build.")
def on_new_seg_sample_factory(self, stream_id):
def on_new_sample(sink):
sample = sink.emit("pull-sample")
if not sample: return Gst.FlowReturn.ERROR
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success: return Gst.FlowReturn.ERROR
with self.seg_buffer_locks[stream_id]:
self.seg_frame_buffers[stream_id] = bytes(map_info.data)
buffer.unmap(map_info)
return Gst.FlowReturn.OK
return on_new_sample
def on_new_sample_factory(self, stream_id):
def on_new_sample(sink):
sample = sink.emit("pull-sample")
if not sample: return Gst.FlowReturn.ERROR
# Update overall FPS counter from the first stream
if stream_id == 0:
self.frame_count += 1
if self.frame_count % 30 == 0:
elapsed = time.time() - self.start_time
self.current_fps = 30 / float(elapsed) if elapsed > 0 else 0
self.start_time = time.time()
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success: return Gst.FlowReturn.ERROR
with self.buffer_locks[stream_id]:
self.frame_buffers[stream_id] = bytes(map_info.data)
buffer.unmap(map_info)
return Gst.FlowReturn.OK
return on_new_sample
def build_pipeline(self):
sources_and_sinks_str = []
for i in range(self.target_num_cams):
if i < len(self.sorted_cams):
cam_info = self.sorted_cams[i]
serial = cam_info['serial']
is_color = cam_info['is_color']
print(f"Setting up pipeline for Stream {i}: {serial} [{'Color' if is_color else 'Mono'}]")
base_settings = f"pylonsrc device-serial-number={serial} " \
"cam::TriggerMode=Off " \
"cam::AcquisitionFrameRateEnable=true cam::AcquisitionFrameRate=60.0 " \
"cam::DeviceLinkThroughputLimitMode=Off "
if is_color:
color_settings = f"{base_settings} " \
"cam::ExposureAuto=Off cam::ExposureTime=20000.0 " \
"cam::GainAuto=Continuous " \
"cam::Width=1920 cam::Height=1080 " \
"cam::PixelFormat=BayerBG8 "
source_and_sink = (
f"{color_settings} ! "
"bayer2rgb ! " # Debayer
"videoconvert ! "
"video/x-raw,format=RGBA ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12, width={self.internal_width}, height={self.internal_height}, framerate=60/1 ! "
f"nvjpegenc quality=60 ! "
f"appsink name=sink_{i} emit-signals=True sync=False max-buffers=1 drop=True"
)
else:
mono_settings = f"{base_settings} " \
"cam::ExposureAuto=Off cam::ExposureTime=20000.0 " \
"cam::GainAuto=Continuous "
if cam_info['binning']:
mono_settings += "cam::BinningHorizontal=2 cam::BinningVertical=2 "
source_and_sink = (
f"{mono_settings} ! "
"video/x-raw,format=GRAY8 ! "
"videoconvert ! "
f"tee name=t_{i} ! "
"queue ! "
"video/x-raw,format=I420 ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12, width={self.internal_width}, height={self.internal_height}, framerate=60/1 ! "
f"nvjpegenc quality=60 ! "
f"appsink name=sink_{i} emit-signals=True sync=False max-buffers=1 drop=True "
f"t_{i}. ! queue ! "
"videoconvert ! " # Placeholder for DeepStream
f"appsink name=seg_sink_{i} emit-signals=True sync=False max-buffers=1 drop=True"
)
else:
# Placeholder for disconnected cameras
source_and_sink = (
"videotestsrc pattern=black is-live=true ! "
f"videorate ! "
f"video/x-raw,width={self.internal_width},height={self.internal_height},format=I420,framerate=60/1 ! "
f"textoverlay text=\"DISCONNECTED\" valignment=center halignment=center font-desc=\"Sans, 48\" ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM),format=NV12,width={self.internal_width},height={self.internal_height},framerate=60/1 ! "
f"nvjpegenc quality=60 ! "
f"appsink name=sink_{i} emit-signals=True sync=False max-buffers=1 drop=True"
)
sources_and_sinks_str.append(source_and_sink)
pipeline_str = " ".join(sources_and_sinks_str)
print("\n--- GStreamer Pipeline String ---")
print(pipeline_str)
print("---------------------------------\n")
self.pipeline = Gst.parse_launch(pipeline_str)
if self.pipeline is None:
print("ERROR: GStreamer pipeline failed to parse. Check pipeline string for errors.")
return
for i in range(self.target_num_cams):
appsink = self.pipeline.get_by_name(f"sink_{i}")
if appsink:
# Set caps on appsink to ensure it's negotiating JPEG
appsink.set_property("caps", Gst.Caps.from_string("image/jpeg,width=(int)[1, 2147483647],height=(int)[1, 2147483647]"))
appsink.connect("new-sample", self.on_new_sample_factory(i))
else:
print(f"Error: appsink_{i} not found in pipeline.")
segsink = self.pipeline.get_by_name(f"seg_sink_{i}")
if segsink:
segsink.connect("new-sample", self.on_new_seg_sample_factory(i))
def get_frame_by_id(self, stream_id):
if 0 <= stream_id < self.target_num_cams:
with self.buffer_locks[stream_id]:
return self.frame_buffers[stream_id]
return None
def get_seg_frame_by_id(self, stream_id):
if 0 <= stream_id < self.target_num_cams:
with self.seg_buffer_locks[stream_id]:
return self.seg_frame_buffers[stream_id]
return None
def get_fps(self):
return round(self.current_fps, 1)

View File

@ -1,301 +0,0 @@
import sys
import subprocess
import threading
import time
import gc
import json
from flask import Flask, Response, render_template_string, jsonify
# --- CONFIGURATION ---
TARGET_NUM_CAMS = 3
DEFAULT_W = 1280
DEFAULT_H = 720
# --- PART 1: DETECTION (Unchanged) ---
def scan_connected_cameras():
print("--- Scanning for Basler Cameras ---")
detection_script = """
import sys
try:
from pypylon import pylon
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
print("NONE")
else:
results = []
for i in range(len(devices)):
cam = pylon.InstantCamera(tl_factory.CreateDevice(devices[i]))
cam.Open()
serial = cam.GetDeviceInfo().GetSerialNumber()
model = cam.GetDeviceInfo().GetModelName()
is_color = model.endswith("c") or "Color" in model
w = cam.Width.GetValue()
h = cam.Height.GetValue()
binning = 0
try:
cam.BinningHorizontal.Value = 2
cam.BinningVertical.Value = 2
cam.BinningHorizontal.Value = 1
cam.BinningVertical.Value = 1
binning = 1
except: pass
current_fmt = cam.PixelFormat.GetValue()
cam.Close()
results.append(f"{serial}:{w}:{h}:{binning}:{1 if is_color else 0}:{model}:{current_fmt}")
print("|".join(results))
except Exception: print("NONE")
"""
try:
result = subprocess.run([sys.executable, "-c", detection_script], capture_output=True, text=True)
output = result.stdout.strip()
if "NONE" in output or not output: return []
camera_list = []
entries = output.split('|')
for entry in entries:
parts = entry.split(':')
camera_list.append({
"serial": parts[0], "width": int(parts[1]), "height": int(parts[2]),
"binning": (parts[3] == '1'), "is_color": (parts[4] == '1'), "model": parts[5]
})
return camera_list
except: return []
DETECTED_CAMS = scan_connected_cameras()
ACTUAL_CAMS_COUNT = len(DETECTED_CAMS)
# --- RESOLUTION LOGIC ---
if ACTUAL_CAMS_COUNT > 0:
MASTER_W = DETECTED_CAMS[0]['width']
MASTER_H = DETECTED_CAMS[0]['height']
else:
MASTER_W = DEFAULT_W
MASTER_H = DEFAULT_H
INTERNAL_WIDTH = 1280
scale = INTERNAL_WIDTH / MASTER_W
INTERNAL_HEIGHT = int(MASTER_H * scale)
if INTERNAL_HEIGHT % 2 != 0: INTERNAL_HEIGHT += 1
WEB_WIDTH = 1280
total_source_width = INTERNAL_WIDTH * TARGET_NUM_CAMS
scale_tiled = WEB_WIDTH / total_source_width
WEB_HEIGHT = int(INTERNAL_HEIGHT * scale_tiled)
if WEB_HEIGHT % 2 != 0: WEB_HEIGHT += 1
print(f"LAYOUT: {TARGET_NUM_CAMS} Slots | Detected: {ACTUAL_CAMS_COUNT}")
for c in DETECTED_CAMS:
print(f" - Cam {c['serial']} ({c['model']}): {'COLOR' if c['is_color'] else 'MONO'}")
# --- FLASK & GSTREAMER ---
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
app = Flask(__name__)
frame_buffer = None
buffer_lock = threading.Lock()
current_fps = 0.0
frame_count = 0
start_time = time.time()
class GStreamerPipeline(threading.Thread):
def __init__(self):
super().__init__()
self.loop = GLib.MainLoop()
self.pipeline = None
def run(self):
Gst.init(None)
self.build_pipeline()
self.pipeline.set_state(Gst.State.PLAYING)
try:
self.loop.run()
except Exception as e:
print(f"Error: {e}")
finally:
self.pipeline.set_state(Gst.State.NULL)
def on_new_sample(self, sink):
global frame_count, start_time, current_fps
sample = sink.emit("pull-sample")
if not sample: return Gst.FlowReturn.ERROR
frame_count += 1
if frame_count % 30 == 0:
elapsed = time.time() - start_time
current_fps = 30 / elapsed if elapsed > 0 else 0
start_time = time.time()
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success: return Gst.FlowReturn.ERROR
global frame_buffer
with buffer_lock:
frame_buffer = bytes(map_info.data)
buffer.unmap(map_info)
return Gst.FlowReturn.OK
def build_pipeline(self):
sources_str = ""
for i in range(TARGET_NUM_CAMS):
if i < len(DETECTED_CAMS):
cam_info = DETECTED_CAMS[i]
serial = cam_info['serial']
is_color = cam_info['is_color']
print(f"Slot {i}: Linking {serial} [{'Color' if is_color else 'Mono'}]")
# --- 1. BASE SETTINGS (Common) ---
# We DISABLE Throughput Limit to allow high bandwidth
base_settings = (
f"pylonsrc device-serial-number={serial} "
"cam::TriggerMode=Off "
"cam::AcquisitionFrameRateEnable=true cam::AcquisitionFrameRate=60.0 "
"cam::DeviceLinkThroughputLimitMode=Off "
)
# Pre-scaler
pre_scale = (
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12, width={INTERNAL_WIDTH}, height={INTERNAL_HEIGHT}, framerate=60/1 ! "
)
if is_color:
# --- 2A. COLOR SETTINGS (High Speed) ---
# FIX: Force ExposureTime=20000.0 (20ms) even for Color.
# If we leave it on Auto, it will slow down the Mono cameras.
# We rely on 'GainAuto' to make the image bright enough.
color_settings = (
f"{base_settings} "
"cam::ExposureAuto=Off cam::ExposureTime=20000.0 "
"cam::GainAuto=Continuous "
"cam::Width=1920 cam::Height=1080 cam::OffsetX=336 cam::OffsetY=484 "
"cam::PixelFormat=BayerBG8 " # Force Format
)
source = (
f"{color_settings} ! "
"bayer2rgb ! " # Debayer
"videoconvert ! "
"video/x-raw,format=RGBA ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12 ! "
f"{pre_scale}"
f"m.sink_{i} "
)
else:
# --- 2B. MONO SETTINGS (High Speed) ---
# Force ExposureTime=20000.0
mono_settings = (
f"{base_settings} "
"cam::ExposureAuto=Off cam::ExposureTime=20000.0 "
"cam::GainAuto=Continuous "
)
if cam_info['binning']:
mono_settings += "cam::BinningHorizontal=2 cam::BinningVertical=2 "
source = (
f"{mono_settings} ! "
"video/x-raw,format=GRAY8 ! "
"videoconvert ! "
"video/x-raw,format=I420 ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12 ! "
f"{pre_scale}"
f"m.sink_{i} "
)
else:
# --- DISCONNECTED PLACEHOLDER ---
source = (
f"videotestsrc pattern=black is-live=true ! "
f"videorate ! "
f"video/x-raw,width={INTERNAL_WIDTH},height={INTERNAL_HEIGHT},format=I420,framerate=60/1 ! "
f"textoverlay text=\"DISCONNECTED\" valignment=center halignment=center font-desc=\"Sans, 48\" ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM),format=NV12,width={INTERNAL_WIDTH},height={INTERNAL_HEIGHT},framerate=60/1 ! "
f"m.sink_{i} "
)
sources_str += source
# 3. MUXER & PROCESSING
processing = (
f"nvstreammux name=m batch-size={TARGET_NUM_CAMS} width={INTERNAL_WIDTH} height={INTERNAL_HEIGHT} "
f"live-source=1 batched-push-timeout=33000 ! "
f"nvmultistreamtiler width={WEB_WIDTH} height={WEB_HEIGHT} rows=1 columns={TARGET_NUM_CAMS} ! "
"nvvideoconvert compute-hw=1 ! "
"video/x-raw(memory:NVMM) ! "
"videorate drop-only=true ! "
"video/x-raw(memory:NVMM), framerate=30/1 ! "
f"nvjpegenc quality=60 ! "
"appsink name=sink emit-signals=True sync=False max-buffers=1 drop=True"
)
pipeline_str = f"{sources_str} {processing}"
print(f"Launching Optimized Pipeline (All Cams Forced to 20ms Shutter)...")
self.pipeline = Gst.parse_launch(pipeline_str)
appsink = self.pipeline.get_by_name("sink")
appsink.connect("new-sample", self.on_new_sample)
# --- FLASK ---
@app.route('/')
def index():
return render_template_string('''
<html>
<head>
<style>
body { background-color: #111; color: white; text-align: center; font-family: monospace; margin: 0; padding: 20px; }
.container { position: relative; display: inline-block; border: 3px solid #4CAF50; }
img { display: block; max-width: 100%; height: auto; }
.hud {
position: absolute; top: 10px; left: 10px;
background: rgba(0, 0, 0, 0.6); color: #00FF00;
padding: 5px 10px; font-weight: bold; pointer-events: none;
}
</style>
</head>
<body>
<h1>Basler Final Feed</h1>
<div class="container">
<div class="hud" id="fps-counter">FPS: --</div>
<img src="{{ url_for('video_feed') }}">
</div>
<script>
setInterval(function() {
fetch('/get_fps').then(r => r.json()).then(d => {
document.getElementById('fps-counter').innerText = "FPS: " + d.fps;
});
}, 500);
</script>
</body>
</html>
''')
@app.route('/video_feed')
def video_feed():
def generate():
count = 0
while True:
with buffer_lock:
if frame_buffer:
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame_buffer + b'\r\n')
time.sleep(0.016)
count += 1
if count % 200 == 0: gc.collect()
return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/get_fps')
def get_fps():
return jsonify(fps=round(current_fps, 1))
if __name__ == "__main__":
subprocess.run([sys.executable, "-c", "import gc; gc.collect()"])
gst_thread = GStreamerPipeline()
gst_thread.daemon = True
gst_thread.start()
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)

View File

@ -1,436 +0,0 @@
body {
background-color: #1a1a1a; /* Darker gray */
color: #ffffff;
font-family: Arial, sans-serif; /* Reverted to original font */
margin: 0;
padding-top: 20px; /* Added padding to top for overall spacing */
padding-bottom: 20px; /* Added padding to bottom for overall spacing */
box-sizing: border-box; /* Ensure padding is included in height */
display: flex; /* Changed to flex */
flex-direction: column; /* Set flex direction to column */
height: 100vh; /* Make body fill viewport height */
gap: 20px; /* Added gap between flex items (h1 and main-container) */
}
h1 {
color: #64ffda; /* Kept existing color */
text-align: center;
margin: 0; /* Removed explicit margins */
}
.main-container {
display: flex; /* Desktop default */
flex-direction: row;
flex-grow: 1; /* Make main-container fill remaining vertical space */
width: 100%;
/* Removed max-width to allow full screen utilization */
margin: 0 auto;
/* Removed height: calc(100vh - 80px); */
/* Removed padding: 20px; */
box-sizing: border-box; /* Ensure padding is included in element's total width and height */
gap: 20px; /* Added spacing between the two main sections */
}
/* Tabs are hidden by default on desktop, dynamically added for mobile */
.tabs {
display: none;
}
.content-section {
display: block; /* Desktop default */
padding: 5px; /* Reduced padding further */
overflow-y: auto;
}
/* --- Lamp View (Original styles adapted to dark theme) --- */
.lamp-view {
flex: 0 0 auto; /* Allow content to determine width, do not shrink */
/* Removed min-width as padding will affect total width */
padding-left: 2vw; /* Added 2vw padding on the left side */
padding-right: 2vw; /* Added 2vw padding on the right side */
border-right: 1px solid #333; /* Reintroduced the line separating the sections */
display: flex;
flex-direction: column;
align-items: center;
overflow-y: auto; /* Added to allow vertical scrolling if its content is too tall */
}
.lamp-view .container { /* Added for original styling effect */
display: flex;
flex-direction: column;
align-items: center;
position: relative;
width: 100%;
}
.lamp-view .main-content { /* Added for original styling effect */
display: flex;
flex-direction: column; /* Changed to column to stack matrix and controls vertically */
align-items: center; /* Changed to center to horizontally center its children */
gap: 20px; /* Adjusted gap for vertical stacking */
flex-wrap: wrap; /* Allow wrapping for responsiveness - not strictly needed for column but kept for safety */
justify-content: center; /* This will center the column within the lamp-view if its width allows */
width: 100%; /* Ensure main-content fills lamp-view's width */
}
.matrix-grid {
display: grid;
grid-template-columns: repeat(5, 70px); /* Fixed 5-column grid */
grid-template-rows: repeat(5, 70px);
gap: 20px;
padding: 20px;
background-color: #333;
border-radius: 10px;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.2);
margin-bottom: 20px; /* Kept margin-bottom for spacing below grid */
/* Removed width: 100%; to let grid determine its own width */
box-sizing: border-box; /* Account for padding */
}
.lamp {
width: 70px;
height: 70px;
border-radius: 10%; /* Reverted to original square with rounded corners */
background-color: #000;
transition: box-shadow 0.2s, transform 0.1s;
cursor: pointer;
border: 2px solid transparent;
}
.lamp.on {
box-shadow: 0 0 15px currentColor, 0 0 25px currentColor;
}
.lamp.selected {
border: 2px solid #fff;
transform: scale(1.1);
}
.region-control {
margin-bottom: 20px; /* Kept margin-bottom for spacing below region-control */
/* Removed text-align: center; as parent's align-items will handle centering */
width: 470px; /* Explicitly set width to match matrix grid */
box-sizing: border-box; /* Ensure padding/border included in width */
}
.region-control select {
padding: 10px 15px;
font-size: 14px;
cursor: pointer;
border: 1px solid #64ffda; /* Adapted to theme */
border-radius: 5px;
background-color: #333; /* Adapted to theme */
color: #ffffff;
width: 100%; /* Fill parent's width */
box-sizing: border-box; /* Include padding in width */
}
.control-panel, .center-lamp-control {
background-color: #444; /* Adapted to theme */
padding: 20px;
border-radius: 10px;
width: 470px; /* Explicitly set width to match matrix grid */
margin-bottom: 20px; /* Kept margin-bottom for spacing below control panel */
box-sizing: border-box; /* Account for padding */
}
.control-panel.inactive-control {
background-color: #333;
filter: saturate(0.2);
}
.control-panel.inactive-control .slider-row {
pointer-events: none;
}
.control-panel h2, .center-lamp-control h2 {
color: #64ffda; /* Adapted to theme */
font-size: 16px;
margin-bottom: 10px;
text-align: center;
}
.slider-group {
width: 100%;
display: flex;
flex-direction: column;
gap: 5px;
}
.slider-row {
display: grid;
grid-template-columns: 150px 1fr 50px; /* Adjusted last column for number input buttons */
gap: 10px;
align-items: center;
}
.slider-group input[type="range"] {
-webkit-appearance: none;
height: 8px;
border-radius: 5px;
outline: none;
cursor: pointer;
background: #555; /* Adapted to theme */
}
.slider-group input[type="number"] {
-webkit-appearance: none; /* Hide default spinner for Chrome, Safari */
-moz-appearance: textfield; /* Hide default spinner for Firefox */
text-align: center; /* Center the number */
width: auto; /* Allow flex-grow to manage width */
font-size: 14px;
border: none; /* Will be part of the new control's border */
border-radius: 0; /* No radius on its own if part of a group */
padding: 5px;
background-color: #333; /* Adapted to theme */
color: #ffffff;
}
/* Specifically hide number input spinner buttons */
.slider-group input[type="number"]::-webkit-inner-spin-button,
.slider-group input[type="number"]::-webkit-outer-spin-button {
-webkit-appearance: none;
margin: 0;
}
.slider-group input[type="range"]::-webkit-slider-thumb {
-webkit-appearance: none;
height: 20px;
width: 20px;
border-radius: 50%;
background: #64ffda; /* Adapted to theme */
cursor: pointer;
box-shadow: 0 0 5px rgba(0,0,0,0.5);
margin-top: 2px;
}
.slider-group input[type="range"]::-webkit-slider-runnable-track {
height: 24px;
border-radius: 12px;
}
input.white-3000k::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #ffc080); }
input.white-6500k::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #c0e0ff); }
input.blue::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #00f); }
.slider-label {
color: #ffffff; /* Adapted to theme */
font-size: 14px;
text-align: left;
white-space: nowrap;
width: 120px;
}
.inactive-control .slider-label {
color: #888;
}
/* --- New styles for number input controls --- */
.number-input-controls {
display: flex;
align-items: stretch; /* Stretch children to fill container height */
gap: 2px; /* Small gap between buttons and input */
flex-shrink: 0; /* Prevent the control group from shrinking in the grid */
}
.number-input-controls input[type="number"] {
flex-grow: 1; /* Make it fill available space */
text-align: center;
border: 1px solid #64ffda; /* Border for the number input */
border-radius: 5px;
background-color: #333;
color: #ffffff;
min-width: 40px; /* Ensure it doesn't get too small */
}
.number-input-controls button {
width: 30px; /* Fixed width */
background-color: #64ffda; /* Accent color */
color: #1a1a1a; /* Dark text */
border: none;
border-radius: 5px;
font-size: 16px;
font-weight: bold;
cursor: pointer;
transition: background-color 0.2s;
display: flex; /* Center content */
justify-content: center;
align-items: center;
line-height: 1; /* Prevent extra height from line-height */
padding: 0; /* Remove default button padding */
}
.number-input-controls button:hover {
background-color: #4ed8bd; /* Lighter accent on hover */
}
.number-input-controls button:active {
background-color: #3cb89f; /* Darker accent on click */
}
/* Adjust slider-row grid to accommodate new number input controls */
.slider-row {
grid-template-columns: 150px 1fr 100px; /* Label, Range, NumberInputGroup(approx 30+30+2+40=102px) */
}
/* --- Camera View (Individual streams) --- */
.camera-view {
flex: 1; /* Allow it to grow and shrink to fill available space */
height: 100%; /* Added to make it fill the height of its parent */
overflow-y: auto; /* Added to allow vertical scrolling if content exceeds height */
/* Removed width: 75%; */
display: flex;
flex-direction: column;
align-items: center;
justify-content: flex-start; /* Align items to start for title */
position: relative;
gap: 10px; /* Space between elements */
}
.camera-streams-grid {
display: grid; /* Use CSS Grid */
/* Removed width: 100%; */
/* Removed height: 100%; */
flex-grow: 1; /* Allow it to grow to fill available space */
grid-template-rows: 1fr 2fr; /* 1/3 for color, 2/3 for monos */
grid-template-columns: 1fr; /* Single column for the main layout */
gap: 10px;
padding: 0 5px; /* Reduced horizontal padding */
}
.camera-color-row {
grid-row: 1;
grid-column: 1;
display: flex;
justify-content: center;
align-items: center;
overflow: hidden; /* Ensure content is clipped */
height: 100%; /* Explicitly set height to fill grid cell */
}
.camera-mono-row {
grid-row: 2;
grid-column: 1;
display: grid;
grid-template-columns: 1fr 1fr; /* Two columns for the mono cameras */
gap: 10px;
overflow: hidden; /* Ensure content is clipped */
height: 100%; /* Explicitly set height to fill grid cell */
}
.camera-container-individual {
position: relative;
border: 1px solid #333;
display: flex; /* Changed to flex for centering image */
justify-content: center;
align-items: center;
background-color: transparent;
aspect-ratio: var(--aspect-ratio); /* Keep aspect-ratio on container */
max-width: 100%; /* Re-added max-width */
/* Removed height: 100%; */
max-height: 100%; /* Ensure it doesn't exceed the boundaries of its parent */
overflow: hidden; /* Ensure image fits and is clipped if necessary */
box-sizing: border-box; /* Include padding and border in the element's total width and height */
border-radius: 10px; /* Added corner radius */
}
.camera-stream-individual {
max-width: 100%;
max-height: 100%;
object-fit: contain;
border-radius: 10px; /* Added corner radius to the image itself */
}
.camera-label {
position: absolute;
bottom: 5px;
left: 5px;
background: rgba(0, 0, 0, 0.6);
color: #fff;
padding: 3px 6px;
font-size: 12px;
border-radius: 3px;
}
.hud {
position: absolute; /* Kept existing position for FPS counter */
top: 10px;
right: 10px; /* Moved to right for better placement in new layout */
background: rgba(0, 0, 0, 0.6);
color: #00FF00;
padding: 5px 10px;
font-weight: bold;
pointer-events: none;
}
/* --- Responsive Design --- */
@media (max-width: 768px) {
.main-container {
flex-direction: column;
height: auto;
max-width: 100%;
}
.tabs {
display: flex; /* Show tabs on mobile */
justify-content: space-around;
background-color: #333;
padding: 10px 0;
}
.tab-link {
background-color: #333;
color: #ffffff;
border: none;
padding: 10px 15px;
cursor: pointer;
transition: background-color 0.3s;
}
.tab-link.active {
background-color: #64ffda;
color: #1a1a1a;
}
.lamp-view, .camera-view {
width: 100%;
border: none;
}
.content-section {
display: none; /* Hide tab content by default on mobile */
}
.content-section.active {
display: block; /* Show active tab content on mobile */
}
.lamp-view .main-content {
flex-direction: column;
align-items: center;
}
.control-panel, .center-lamp-control {
width: 100%;
max-width: none;
}
.camera-streams-grid {
/* On mobile, stack cameras */
grid-template-rows: auto; /* Revert to auto rows */
grid-template-columns: 1fr; /* Single column */
padding: 0;
}
.camera-color-row, .camera-mono-row {
grid-row: auto;
grid-column: auto;
display: flex; /* Change mono-row to flex for stacking vertically on mobile */
flex-direction: column;
gap: 10px;
}
.camera-container-individual {
width: 100%;
height: auto; /* Let aspect-ratio define height */
}
}

View File

@ -1,423 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>Pupilometer Unified Control</title>
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
</head>
<body>
<h1>Pupilometer Unified Control</h1>
<div class="main-container">
<!-- The content sections will be populated based on the view -->
<div id="lamp" class="content-section lamp-view">
<!-- Lamp Control UI goes here -->
<div class="container">
<h2>Lamp Matrix Control</h2>
<div class="region-control">
<label for="region-select">Select Region:</label>
<select id="region-select">
<option value="" disabled selected>-- Select a region --</option>
<option value="Upper">Upper</option>
<option value="Lower">Lower</option>
<option value="Left">Left</option>
<option value="Right">Right</option>
<option value="Inner ring">Inner ring</option>
<option value="Outer ring">Outer ring</option>
<option value="All">All</option>
</select>
</div>
<div class="main-content">
<div class="matrix-grid">
{% for row in range(5) %}
{% for col in range(5) %}
<div class="lamp" data-row="{{ row }}" data-col="{{ col }}" style="background-color: {{ matrix[row][col] }};"></div>
{% endfor %}
{% endfor %}
</div>
<div class="slider-controls">
<div class="center-lamp-control">
<h2>Center Lamp</h2>
<div class="slider-group center-slider-group">
<div class="slider-row">
<span class="slider-label">Warm White (3000K)</span>
<input type="range" id="center-ww-slider" min="0" max="255" value="0" class="white-3000k">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="center-ww-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
<div class="slider-row">
<span class="slider-label">Cool White (6500K)</span>
<input type="range" id="center-cw-slider" min="0" max="255" value="0" class="white-6500k">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="center-cw-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
<div class="slider-row">
<span class="slider-label">Blue</span>
<input type="range" id="center-blue-slider" min="0" max="255" value="0" class="blue">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="center-blue-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
</div>
</div>
<div class="control-panel">
<h2>Selected Region</h2>
<div class="slider-group region-slider-group">
<div class="slider-row">
<span class="slider-label">Warm White (3000K)</span>
<input type="range" id="ww-slider" min="0" max="255" value="0" class="white-3000k">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="ww-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
<div class="slider-row">
<span class="slider-label">Cool White (6500K)</span>
<input type="range" id="cw-slider" min="0" max="255" value="0" class="white-6500k">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="cw-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
<div class="slider-row">
<span class="slider-label">Blue</span>
<input type="range" id="blue-slider" min="0" max="255" value="0" class="blue">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="blue-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
<div id="camera" class="content-section camera-view">
<h2>Basler Final Feed</h2>
<div class="camera-streams-grid">
<div class="camera-color-row">
{% for cam_index in range(detected_cams_info|length) %}
{% set cam_info = detected_cams_info[cam_index] %}
{% if cam_info.is_color %}
<div class="camera-container-individual {% if cam_info.is_color %}camera-color{% else %}camera-mono{% endif %}" style="--aspect-ratio: {{ cam_info.aspect_ratio }};">
<img src="{{ url_for('video_feed', stream_id=cam_index) }}?t={{ now().timestamp() }}" class="camera-stream-individual">
<div class="camera-label">{{ cam_info.model }} ({{ 'Color' if cam_info.is_color else 'Mono' }})</div>
</div>
{% endif %}
{% endfor %}
</div>
<div class="camera-mono-row">
{% for cam_index in range(detected_cams_info|length) %}
{% set cam_info = detected_cams_info[cam_index] %}
{% if not cam_info.is_color %}
<div class="camera-container-individual {% if cam_info.is_color %}camera-color{% else %}camera-mono{% endif %}" style="--aspect-ratio: {{ cam_info.aspect_ratio }};">
<img src="{{ url_for('video_feed', stream_id=cam_index) }}?t={{ now().timestamp() }}" class="camera-stream-individual">
<div class="camera-label">{{ cam_info.model }} ({{ 'Color' if cam_info.is_color else 'Mono' }})</div>
</div>
{% endif %}
{% endfor %}
</div>
</div>
<div class="hud" id="fps-counter">FPS: --</div>
</div>
<div id="segmentation" class="content-section camera-view">
<h2>Segmentation Feed</h2>
<div class="camera-streams-grid">
<div class="camera-mono-row">
{% for cam_index in range(detected_cams_info|length) %}
{% set cam_info = detected_cams_info[cam_index] %}
{% if not cam_info.is_color %}
<div class="camera-container-individual camera-mono" style="--aspect-ratio: {{ cam_info.aspect_ratio }};">
<img src="{{ url_for('segmentation_feed', stream_id=cam_index) }}?t={{ now().timestamp() }}" class="camera-stream-individual" id="segmentation-feed-{{- cam_index -}}">
<div class="camera-label">{{ cam_info.model }} (Segmentation)</div>
</div>
{% endif %}
{% endfor %}
</div>
</div>
</div>
</div>
<script>
// FPS counter
setInterval(function() {
fetch('/get_fps').then(r => r.json()).then(d => {
document.getElementById('fps-counter').innerText = "FPS: " + d.fps;
});
}, 500);
// State for the entire 5x5 matrix, storing {ww, cw, blue} for each lamp
var lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0}));
var selectedLamps = [];
// Function to calculate a visual RGB color from the three light values using a proper additive model
function calculateRgb(ww, cw, blue) {
const warmWhiteR = 255, warmWhiteG = 192, warmWhiteB = 128;
const coolWhiteR = 192, coolWhiteG = 224, coolWhiteB = 255;
const blueR = 0, blueG = 0, blueB = 255;
var r = (ww / 255) * warmWhiteR + (cw / 255) * coolWhiteR + (blue / 255) * blueR;
var g = (ww / 255) * warmWhiteG + (cw / 255) * coolWhiteR + (blue / 255) * blueG;
var b = (ww / 255) * warmWhiteB + (cw / 255) * coolWhiteB + (blue / 255) * blueB;
r = Math.min(255, Math.round(r));
g = Math.min(255, Math.round(g));
b = Math.min(255, Math.round(b));
var toHex = (c) => ('0' + c.toString(16)).slice(-2);
return '#' + toHex(r) + toHex(g) + toHex(b);
}
function updateLampUI(lamp, colorState) {
var newColor = calculateRgb(colorState.ww, colorState.cw, colorState.blue);
var lampElement = $(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`);
lampElement.css('background-color', newColor);
if (newColor === '#000000') {
lampElement.removeClass('on');
lampElement.css('box-shadow', `inset 0 0 5px rgba(0,0,0,0.5)`);
} else {
lampElement.addClass('on');
lampElement.css('box-shadow', `0 0 15px ${newColor}, 0 0 25px ${newColor}`);
}
}
function sendFullMatrixUpdate(lampsToUpdate, isRegionUpdate = false) {
var fullMatrixData = lampMatrixState.map(row => row.map(lamp => ({
ww: lamp.ww,
cw: lamp.cw,
blue: lamp.blue
})));
$.ajax({
url: '/set_matrix',
type: 'POST',
contentType: 'application/json',
data: JSON.stringify({ matrix: fullMatrixData }),
success: function(response) {
if (response.success) {
if (isRegionUpdate) {
for (var r = 0; r < 5; r++) {
for (var c = 0; c < 5; c++) {
updateLampUI({row: r, col: c}, lampMatrixState[r][c]);
}
}
} else {
lampsToUpdate.forEach(function(lamp) {
updateLampUI(lamp, lampMatrixState[lamp.row][lamp.col]);
});
}
}
}
});
}
function updateSliders(ww, cw, blue, prefix = '') {
$(`#${prefix}ww-slider`).val(ww);
$(`#${prefix}cw-slider`).val(cw);
$(`#${prefix}blue-slider`).val(blue);
$(`#${prefix}ww-number`).val(ww);
$(`#${prefix}cw-number`).val(cw);
$(`#${prefix}blue-number`).val(blue);
}
$(document).ready(function() {
var regionMaps = {
'Upper': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
],
'Lower': [
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'Left': [
{row: 0, col: 0}, {row: 1, col: 0}, {row: 2, col: 0}, {row: 3, col: 0}, {row: 4, col: 0},
{row: 0, col: 1}, {row: 1, col: 1}, {row: 2, col: 1}, {row: 3, col: 1}, {row: 4, col: 1},
],
'Right': [
{row: 0, col: 3}, {row: 1, col: 3}, {row: 2, col: 3}, {row: 3, col: 3}, {row: 4, col: 3},
{row: 0, col: 4}, {row: 1, col: 4}, {row: 2, col: 4}, {row: 3, col: 4}, {row: 4, col: 4},
],
'Inner ring': [
{row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3},
{row: 2, col: 1}, {row: 2, col: 3},
{row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}
],
'Outer ring': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'All': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 1}, {row: 2, col: 3}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
]
};
var allRegionWithoutCenter = regionMaps['All'].filter(lamp => !(lamp.row === 2 && lamp.col === 2));
regionMaps['All'] = allRegionWithoutCenter;
$('.lamp').each(function() {
var row = $(this).data('row');
var col = $(this).data('col');
var color = $(this).css('background-color');
var rgb = color.match(/\d+/g);
lampMatrixState[row][col] = {
ww: rgb[0], cw: rgb[1], blue: rgb[2]
};
});
$('#region-select').on('change', function() {
var region = $(this).val();
if (region) {
$('.control-panel').removeClass('inactive-control');
} else {
$('.control-panel').addClass('inactive-control');
}
var newlySelectedLamps = regionMaps[region];
$('.lamp').removeClass('selected');
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
var lampsToUpdate = [];
var centerLampState = lampMatrixState[2][2];
lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0}));
lampMatrixState[2][2] = centerLampState;
selectedLamps = newlySelectedLamps;
selectedLamps.forEach(function(lamp) {
$(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`).addClass('selected');
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
});
if (selectedLamps.length > 0) {
var firstLamp = selectedLamps[0];
var firstLampState = lampMatrixState[firstLamp.row][firstLamp.col];
updateSliders(firstLampState.ww, firstLampState.cw, firstLampState.blue, '');
}
sendFullMatrixUpdate(lampsToUpdate, true);
});
$('.region-slider-group input').on('input', function() {
if (selectedLamps.length === 0) return;
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
var lampsToUpdate = [];
selectedLamps.forEach(function(lamp) {
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
lampsToUpdate.push(lamp);
});
sendFullMatrixUpdate(lampsToUpdate);
});
$('.center-slider-group input').on('input', function() {
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#center-ww-slider').val());
var cw = parseInt($('#center-cw-slider').val());
var blue = parseInt($('#center-blue-slider').val());
var centerLamp = {row: 2, col: 2};
lampMatrixState[centerLamp.row][centerLamp.col] = {ww: ww, cw: cw, blue: blue};
sendFullMatrixUpdate([centerLamp]);
});
// Handle increment/decrement buttons
$('.number-input-controls button').on('click', function() {
var btn = $(this);
var numberInput = btn.siblings('input[type="number"]');
var currentVal = parseInt(numberInput.val());
var min = parseInt(numberInput.attr('min'));
var max = parseInt(numberInput.attr('max'));
if (btn.hasClass('decrement-btn')) {
currentVal = Math.max(min, currentVal - 1);
} else if (btn.hasClass('increment-btn')) {
currentVal = Math.min(max, currentVal + 1);
}
numberInput.val(currentVal);
// Trigger the 'input' event to propagate the change to the slider and matrix update logic
numberInput.trigger('input');
});
if (!$('#region-select').val()) {
$('.control-panel').addClass('inactive-control');
}
// Mobile tab handling
if (window.innerWidth <= 768) {
// Dynamically add tab buttons
const tabsDiv = $('<div class="tabs"></div>');
tabsDiv.append('<button class="tab-link" data-tab="camera">Camera</button>');
tabsDiv.append('<button class="tab-link" data-tab="lamp">Lamp Control</button>');
// Prepend tabsDiv to .main-container
$('.main-container').prepend(tabsDiv);
// Hide all content sections initially
$('.content-section').hide();
// Show the camera section by default
$('#camera').show();
// Make the Camera tab active
$('.tab-link[data-tab="camera"]').addClass('active');
// Add click handlers for tab buttons
$('.tab-link').on('click', function() {
$('.tab-link').removeClass('active');
$(this).addClass('active');
$('.content-section').hide();
$(`#${$(this).data('tab')}`).show();
});
}
});
</script>
</body>
</html>

View File

@ -1,58 +0,0 @@
from pypylon import pylon
import time
import sys
try:
# Get the Transport Layer Factory
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
print("No cameras found!")
sys.exit(1)
print(f"Found {len(devices)} cameras. Checking Camera 1...")
# Connect to first camera
cam = pylon.InstantCamera(tl_factory.CreateDevice(devices[0]))
cam.Open()
# 1. Reset to Defaults
print("Reseting to Defaults...")
cam.UserSetSelector.Value = "Default"
cam.UserSetLoad.Execute()
# 2. Enable Auto Exposure/Gain
print("Enabling Auto Exposure & Gain...")
cam.ExposureAuto.Value = "Continuous"
cam.GainAuto.Value = "Continuous"
# 3. Wait for it to settle (Camera adjusts to light)
print("Waiting 3 seconds for auto-adjustment...")
for i in range(3):
print(f"{3-i}...")
time.sleep(1)
# 4. READ VALUES
current_exposure = cam.ExposureTime.GetValue() # In Microseconds (us)
current_fps_readout = cam.ResultingFrameRate.GetValue()
print("-" * 30)
print(f"REPORT FOR SERIAL: {cam.GetDeviceInfo().GetSerialNumber()}")
print("-" * 30)
print(f"Current Exposure Time: {current_exposure:.1f} us ({current_exposure/1000:.1f} ms)")
print(f"Theoretical Max FPS: {1000000 / current_exposure:.1f} FPS")
print(f"Camera Internal FPS: {current_fps_readout:.1f} FPS")
print("-" * 30)
if current_exposure > 33000:
print("⚠️ PROBLEM FOUND: Exposure is > 33ms.")
print(" This physically prevents the camera from reaching 30 FPS.")
print(" Solution: Add more light or limit AutoExposureUpperLimit.")
else:
print("✅ Exposure looks fast enough for 30 FPS.")
cam.Close()
except Exception as e:
print(f"Error: {e}")

View File

@ -1,16 +0,0 @@
#!/bin/bash
# Test the main page
echo "Testing main page..."
curl -s -o /dev/null -w "%{http_code}" http://localhost:5000/
echo ""
# Test the get_fps endpoint
echo "Testing get_fps endpoint..."
curl -s -o /dev/null -w "%{http_code}" http://localhost:5000/get_fps
echo ""
# Test the set_matrix endpoint
echo "Testing set_matrix endpoint..."
curl -s -o /dev/null -w "%{http_code}" -X POST -H "Content-Type: application/json" -d '{"matrix": [[{"ww":0,"cw":0,"blue":0}]]}' http://localhost:5000/set_matrix
echo ""

View File

@ -1,52 +0,0 @@
import re
from playwright.sync_api import Page, expect
def test_ui_elements_mobile(page: Page):
page.set_viewport_size({"width": 375, "height": 667})
page.goto("http://localhost:5000/")
# Check for main title
expect(page).to_have_title("Pupilometer Unified Control")
# Wait for dynamically added tabs to be attached to the DOM
page.wait_for_selector(".tabs", state="attached")
# Check for dynamically added tabs visibility on mobile
expect(page.locator(".tabs")).to_be_visible()
expect(page.locator(".tab-link[data-tab='camera']")).to_be_visible()
expect(page.locator(".tab-link[data-tab='lamp']")).to_be_visible()
# Check for camera view content
expect(page.locator("#camera h2")).to_contain_text("Basler Final Feed")
expect(page.locator("#fps-counter")).to_be_visible()
expect(page.locator("#camera .camera-streams-grid .camera-container-individual")).to_have_count(3)
expect(page.locator(".camera-streams-grid .camera-label").first).to_be_visible()
# Check for lamp view content
page.locator(".tab-link[data-tab='lamp']").click()
expect(page.locator("#lamp .container > h2")).to_contain_text("Lamp Matrix Control")
expect(page.locator("#region-select")).to_be_visible()
expect(page.locator(".center-lamp-control h2")).to_contain_text("Center Lamp")
expect(page.locator(".control-panel h2")).to_contain_text("Selected Region")
def test_ui_elements_desktop(page: Page):
page.set_viewport_size({"width": 1280, "height": 720})
page.goto("http://localhost:5000/")
# Check for main title
expect(page).to_have_title("Pupilometer Unified Control")
# Check that tabs are NOT visible on desktop
expect(page.locator(".tabs")).not_to_be_visible()
# Check for camera view content
expect(page.locator("#camera h2")).to_contain_text("Basler Final Feed")
expect(page.locator("#fps-counter")).to_be_visible()
expect(page.locator("#camera .camera-streams-grid .camera-container-individual")).to_have_count(3)
expect(page.locator(".camera-streams-grid .camera-label").first).to_be_visible()
# Check for lamp view content
expect(page.locator("#lamp .container > h2")).to_contain_text("Lamp Matrix Control")
expect(page.locator("#region-select")).to_be_visible()
expect(page.locator(".center-lamp-control h2")).to_contain_text("Center Lamp")
expect(page.locator(".control-panel h2")).to_contain_text("Selected Region")

View File

@ -1,126 +0,0 @@
import re
from playwright.sync_api import Page, expect
def test_visual_regression_desktop(page: Page):
page.set_viewport_size({"width": 1280, "height": 720})
page.goto("http://localhost:5000/")
page.screenshot(path="src/unified_web_ui/tests/screenshots/screenshot_desktop.png")
def test_visual_regression_tablet(page: Page):
page.set_viewport_size({"width": 768, "height": 1024}) # Common tablet size
page.goto("http://localhost:5000/")
page.screenshot(path="src/unified_web_ui/tests/screenshots/screenshot_tablet.png")
def test_visual_regression_mobile(page: Page):
page.set_viewport_size({"width": 375, "height": 667})
page.goto("http://localhost:5000/")
page.screenshot(path="src/unified_web_ui/tests/screenshots/screenshot_mobile.png")
def test_camera_layout_dimensions(page: Page):
page.set_viewport_size({"width": 1280, "height": 720})
page.goto("http://localhost:5000/")
# Wait for camera streams to load
page.wait_for_selector('img[src*="video_feed"]')
# Get bounding boxes for the key layout elements
camera_streams_grid_box = page.locator('#camera .camera-streams-grid').bounding_box()
color_camera_row_box = page.locator('#camera .camera-color-row').bounding_box()
mono_camera_row_box = page.locator('#camera .camera-mono-row').bounding_box()
assert camera_streams_grid_box is not None, "Camera streams grid not found"
assert color_camera_row_box is not None, "Color camera row not found"
assert mono_camera_row_box is not None, "Mono camera row not found"
# Define a small tolerance for floating point comparisons
tolerance = 7 # pixels, increased slightly for robust testing across browsers/OS
# 1. Check vertical positioning and 1/3, 2/3 height distribution
# The grid's 1fr 2fr distribution applies to the space *after* accounting for gaps.
grid_internal_gap_height = 10 # Defined in .camera-streams-grid gap property
total_distributable_height = camera_streams_grid_box['height'] - grid_internal_gap_height
expected_color_row_height = total_distributable_height / 3
expected_mono_row_height = total_distributable_height * 2 / 3
assert abs(color_camera_row_box['height'] - expected_color_row_height) < tolerance, \
f"Color camera row height is {color_camera_row_box['height']}, expected {expected_color_row_height} (1/3 of distributable height)"
assert abs(mono_camera_row_box['height'] - expected_mono_row_height) < tolerance, \
f"Mono camera row height is {mono_camera_row_box['height']}, expected {expected_mono_row_height} (2/3 of distributable height)"
# Check vertical stacking - top of mono row should be roughly at bottom of color row + gap
assert abs(mono_camera_row_box['y'] - (color_camera_row_box['y'] + color_camera_row_box['height'] + grid_internal_gap_height)) < tolerance, \
"Mono camera row is not positioned correctly below the color camera row with the expected gap."
# 2. Check horizontal padding (5px on each side of .camera-streams-grid)
grid_left_edge = camera_streams_grid_box['x']
grid_right_edge = camera_streams_grid_box['x'] + camera_streams_grid_box['width']
color_row_left_edge = color_camera_row_box['x']
color_row_right_edge = color_camera_row_box['x'] + color_camera_row_box['width']
mono_row_left_edge = mono_camera_row_box['x']
mono_row_right_edge = mono_camera_row_box['x'] + mono_camera_row_box['width']
# The content rows should align with the grid's padding
assert abs(color_row_left_edge - (grid_left_edge + 5)) < tolerance, \
f"Color camera row left edge is {color_row_left_edge}, expected {grid_left_edge + 5} (grid left + 5px padding)"
assert abs(grid_right_edge - color_row_right_edge - 5) < tolerance, \
f"Color camera row right edge is {color_row_right_edge}, expected {grid_right_edge - 5} (grid right - 5px padding)"
assert abs(mono_row_left_edge - (grid_left_edge + 5)) < tolerance, \
f"Mono camera row left edge is {mono_row_left_edge}, expected {grid_left_edge + 5} (grid left + 5px padding)"
assert abs(grid_right_edge - mono_row_right_edge - 5) < tolerance, \
f"Mono camera row right edge is {mono_row_right_edge}, expected {grid_right_edge - 5} (grid right - 5px padding)"
# 3. Verify no "behind" effect - check if mono camera row box's top is below color camera row's bottom
# This is implicitly covered by the vertical stacking check, but can be explicit for clarity
assert mono_camera_row_box['y'] > color_camera_row_box['y'] + color_camera_row_box['height'], \
"Mono camera row is visually overlapping the color camera row."
# 4. Check that individual camera containers tightly wrap their images
color_cam_container = page.locator('.camera-color-row .camera-container-individual')
color_cam_img = color_cam_container.locator('.camera-stream-individual')
if color_cam_container.count() > 0:
color_container_box = color_cam_container.bounding_box()
color_img_box = color_cam_img.bounding_box()
assert color_container_box is not None, "Color camera container not found for image fit check"
assert color_img_box is not None, "Color camera image not found for image fit check"
assert abs(color_container_box['width'] - color_img_box['width']) < tolerance, \
f"Color camera container width ({color_container_box['width']}) does not match image width ({color_img_box['width']})"
assert abs(color_container_box['height'] - color_img_box['height']) < tolerance, \
f"Color camera container height ({color_container_box['height']}) does not match image height ({color_img_box['height']})"
mono_cam_containers = page.locator('#camera .camera-mono-row .camera-container-individual').all()
for i, mono_cam_container in enumerate(mono_cam_containers):
mono_cam_img = mono_cam_container.locator('.camera-stream-individual')
mono_container_box = mono_cam_container.bounding_box()
mono_img_box = mono_cam_img.bounding_box()
assert mono_container_box is not None, f"Mono camera container {i} not found for image fit check"
assert mono_img_box is not None, f"Mono camera image {i} not found for image fit check"
assert abs(mono_container_box['width'] - mono_img_box['width']) < tolerance, \
f"Mono camera container {i} width ({mono_container_box['width']}) does not match image width ({mono_img_box['width']})"
assert abs(mono_container_box['height'] - mono_img_box['height']) < tolerance, \
f"Mono camera container {i} height ({mono_container_box['height']}) does not match image height ({mono_img_box['height']})"
# Optionally, check that individual mono cameras are side-by-side within their row
mono_cams = page.locator('#camera .camera-mono').all()
assert len(mono_cams) == 2, "Expected two mono cameras"
if len(mono_cams) == 2:
mono_cam_1_box = mono_cams[0].bounding_box()
mono_cam_2_box = mono_cams[1].bounding_box()
assert mono_cam_1_box is not None and mono_cam_2_box is not None, "Mono camera boxes not found"
# Check horizontal alignment
assert abs(mono_cam_1_box['y'] - mono_cam_2_box['y']) < tolerance, \
"Mono cameras are not horizontally aligned."
# Check side-by-side positioning (cam 2 should be to the right of cam 1)
assert mono_cam_2_box['x'] > mono_cam_1_box['x'] + mono_cam_1_box['width'] - tolerance, \
"Mono cameras are not side-by-side as expected."

View File

@ -1,21 +0,0 @@
import pytest
from pypylon import pylon
@pytest.fixture(scope="session")
def camera_available():
"""
Pytest fixture that checks for a connected Basler camera.
If no camera is found, it skips the tests that depend on this fixture.
"""
try:
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
pytest.skip("No Basler camera found. Skipping tests that require a camera.")
# You can also add a photo capture test here if you want
# For now, just detecting the camera is enough
except Exception as e:
pytest.fail(f"An error occurred during camera detection: {e}")

View File

@ -1,52 +0,0 @@
import pytest
from pypylon import pylon
import cv2
@pytest.mark.usefixtures("camera_available")
def test_capture_photo():
"""
Tests that a photo can be captured from the Basler camera.
This test depends on the `camera_available` fixture in conftest.py.
"""
try:
# Get the transport layer factory.
tl_factory = pylon.TlFactory.GetInstance()
# Get all attached devices and exit application if no device is found.
devices = tl_factory.EnumerateDevices()
# Only grab from the first camera found
camera = pylon.InstantCamera(tl_factory.CreateDevice(devices[0]))
camera.Open()
# Max number of images to grab
countOfImagesToGrab = 1
# Create an image format converter
converter = pylon.ImageFormatConverter()
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
# Start grabbing continuously
camera.StartGrabbingMax(countOfImagesToGrab)
img = None
while camera.IsGrabbing():
grabResult = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grabResult.GrabSucceeded():
# Access the image data
image = converter.Convert(grabResult)
img = image.GetArray()
grabResult.Release()
camera.Close()
assert img is not None, "Failed to capture an image."
assert img.shape[0] > 0, "Captured image has zero height."
assert img.shape[1] > 0, "Captured image has zero width."
except Exception as e:
pytest.fail(f"An error occurred during photo capture: {e}")

View File

@ -1,133 +0,0 @@
import pytest
import subprocess
import time
import requests
import os
import sys
from playwright.sync_api import Page, expect
# Define the host and port for the application
HOST = "127.0.0.1"
PORT = 5000
BASE_URL = f"http://{HOST}:{PORT}"
STDOUT_FILE = "app_stdout.log"
STDERR_FILE = "app_stderr.log"
@pytest.fixture(scope="module")
def run_app():
"""
Fixture to run the Flask application in a test environment.
"""
# Set the environment variable for the subprocess
env = os.environ.copy()
env["PUPILOMETER_ENV"] = "test"
command = [sys.executable, "-u", "app.py"]
with open(STDOUT_FILE, "w") as stdout_f, open(STDERR_FILE, "w") as stderr_f:
process = subprocess.Popen(
command,
cwd="src/controllerSoftware",
stdout=stdout_f,
stderr=stderr_f,
text=True,
env=env
)
# Wait for the app to start
start_time = time.time()
while True:
if os.path.exists(STDERR_FILE):
with open(STDERR_FILE, "r") as f:
if "* Running on http" in f.read():
break
if time.time() - start_time > 15:
raise TimeoutError("Flask app failed to start in time.")
time.sleep(0.5)
yield process
process.terminate()
process.wait()
# Read stdout and stderr for debugging
with open(STDOUT_FILE, "r") as f:
print("App STDOUT:\n", f.read())
with open(STDERR_FILE, "r") as f:
print("App STDERR:\n", f.read())
if os.path.exists(STDOUT_FILE):
os.remove(STDOUT_FILE)
if os.path.exists(STDERR_FILE):
os.remove(STDERR_FILE)
def test_program_output(run_app):
"""
Tests that the mock backend is initialized.
"""
with open(STDERR_FILE, "r") as f:
stderr = f.read()
assert "Initializing Mock backend" in stderr
assert "MockBackend initialized." in stderr
def test_curl_output(run_app):
"""
Tests the API endpoints using requests (similar to curl).
"""
# Test the /ble_status endpoint
response_ble = requests.get(f"{BASE_URL}/ble_status")
assert response_ble.status_code == 200
assert response_ble.json() == {"connected": True} # In DEBUG_MODE
# Test the /vision/pupil_data endpoint
response_vision = requests.get(f"{BASE_URL}/vision/pupil_data")
assert response_vision.status_code == 200
assert "data" in response_vision.json()
assert "success" in response_vision.json()
def test_playwright_checks(page: Page, run_app):
"""
Performs basic and visual checks using Playwright.
"""
page.goto(BASE_URL)
# Basic output check: Title and heading
expect(page).to_have_title("Lamp Matrix Control")
heading = page.locator("h1")
expect(heading).to_have_text("Lamp Matrix Control")
# Pupil detection UI check
pupil_detection_section = page.locator("#pupil-detection")
expect(pupil_detection_section).to_be_visible()
expect(pupil_detection_section.locator("h2")).to_have_text("Pupil Detection")
pupil_canvas = page.locator("#pupil-canvas")
expect(pupil_canvas).to_be_visible()
pupil_center = page.locator("#pupil-center")
pupil_area = page.locator("#pupil-area")
expect(pupil_center).to_be_visible()
expect(pupil_area).to_be_visible()
# Wait for the pupil data to be updated
time.sleep(1)
expect(pupil_center).not_to_have_text("(x, y)")
expect(pupil_area).not_to_have_text("0")
# Camera stream UI check
camera_feed_section = page.locator("#video-feed")
expect(camera_feed_section).to_be_visible()
expect(camera_feed_section.locator("h2")).to_have_text("Camera Feed")
video_feed_img = page.locator("#video-feed img")
expect(video_feed_img).to_be_visible()
expect(video_feed_img).to_have_attribute("src", "/video_feed")
# Visual check: Screenshot
os.makedirs("screenshots", exist_ok=True)
screenshot_path = "screenshots/homepage.png"
page.screenshot(path=screenshot_path)
assert os.path.exists(screenshot_path)

View File

@ -1,18 +0,0 @@
import cv2
import time
import pytest
from playwright.sync_api import Page, expect
def test_segmentation_output(page: Page):
page.goto("http://localhost:5000/")
# Check for the presence of a segmentation feed for the first mono camera (stream 1)
segmentation_feed = page.locator("#segmentation-feed-1")
expect(segmentation_feed).to_be_visible()
# Verify that the segmentation feed is updating
initial_src = segmentation_feed.get_attribute("src")
page.reload()
page.wait_for_selector("#segmentation-feed-1")
new_src = segmentation_feed.get_attribute("src")
assert initial_src != new_src, "Segmentation feed is not updating"

View File

@ -1,135 +0,0 @@
import unittest
from unittest.mock import patch, MagicMock
import sys
import os
import numpy as np
# Add the src/controllerSoftware directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../src/controllerSoftware')))
# Mock the gi module
sys.modules['gi'] = MagicMock()
sys.modules['gi.repository'] = MagicMock()
from vision import VisionSystem, DeepStreamBackend, PythonBackend, MockBackend
class TestVisionSystem(unittest.TestCase):
"""
Unit tests for the VisionSystem class.
"""
def setUp(self):
"""
Set up a VisionSystem instance with a mocked backend for each test.
"""
self.config = {"camera_id": 0, "model_path": "yolov8n-seg.pt"}
@patch('platform.system', return_value='Linux')
@patch('vision.DeepStreamBackend')
def test_initialization_linux(self, mock_backend_class, mock_system):
"""
Test that the VisionSystem initializes the DeepStreamBackend on Linux.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
expected_config = self.config.copy()
expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name
mock_backend_class.assert_called_once_with(expected_config)
self.assertEqual(vision_system._backend, mock_backend_instance)
@patch('platform.system', return_value='Windows')
@patch('vision.DeepStreamBackend')
def test_initialization_windows(self, mock_backend_class, mock_system):
"""
Test that the VisionSystem initializes the DeepStreamBackend on Windows.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
expected_config = self.config.copy()
expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name
mock_backend_class.assert_called_once_with(expected_config)
self.assertEqual(vision_system._backend, mock_backend_instance)
@patch('platform.system', return_value='Darwin')
@patch('vision.PythonBackend')
def test_initialization_macos(self, mock_backend_class, mock_system):
"""
Test that the VisionSystem initializes the PythonBackend on macOS.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
expected_config = self.config.copy()
expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name
mock_backend_class.assert_called_once_with(expected_config)
self.assertEqual(vision_system._backend, mock_backend_instance)
@patch('platform.system', return_value='UnsupportedOS')
def test_initialization_unsupported(self, mock_system):
"""
Test that the VisionSystem raises an exception on an unsupported OS.
"""
with self.assertRaises(NotImplementedError):
VisionSystem(self.config)
@patch('platform.system', return_value='Linux')
@patch('vision.DeepStreamBackend')
def test_start(self, mock_backend_class, mock_system):
"""
Test that the start method calls the backend's start method.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
vision_system.start()
mock_backend_instance.start.assert_called_once()
@patch('platform.system', return_value='Linux')
@patch('vision.DeepStreamBackend')
def test_stop(self, mock_backend_class, mock_system):
"""
Test that the stop method calls the backend's stop method.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
vision_system.stop()
mock_backend_instance.stop.assert_called_once()
@patch('platform.system', return_value='Linux')
@patch('vision.DeepStreamBackend')
def test_get_pupil_data(self, mock_backend_class, mock_system):
"""
Test that the get_pupil_data method calls the backend's get_pupil_data method.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
vision_system.get_pupil_data()
mock_backend_instance.get_pupil_data.assert_called_once()
@patch('platform.system', return_value='Linux')
@patch('vision.DeepStreamBackend')
def test_get_annotated_frame(self, mock_backend_class, mock_system):
"""
Test that the get_annotated_frame method calls the backend's get_annotated_frame method.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
vision_system.get_annotated_frame()
mock_backend_instance.get_annotated_frame.assert_called_once()
def test_mock_backend_methods(self):
"""
Test the methods of the MockBackend.
"""
backend = MockBackend(self.config)
backend.start()
backend.stop()
data = backend.get_pupil_data()
self.assertIn("pupil_position", data)
frame = backend.get_annotated_frame()
self.assertIsInstance(frame, np.ndarray)
def test_model_exists(self):
"""
Tests that the YOLO model file (.pt) exists at the expected location.
"""
model_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../src/controllerSoftware', self.config['model_path']))
self.assertTrue(os.path.exists(model_path), f"YOLO model file not found at {model_path}")