Compare commits

..

24 Commits

Author SHA1 Message Date
1f8da0017c feat: Integrate pupil segmentation infrastructure
This commit introduces the necessary infrastructure for integrating pupil segmentation into the mono camera pipelines.

Key changes include:
- Modifying `gstreamer_pipeline.py` to add a tee element to split mono camera streams, creating a dedicated branch for segmentation output with a placeholder `videoconvert` element and `appsink`. This also includes new callbacks and data structures to handle the segmentation frames.
- Adding a new Flask route `/segmentation_feed/<int:stream_id>` to `app.py` to serve the segmentation video stream to the frontend.
- Updating `index.html` to display the new segmentation feed and implementing cache-busting for all video streams.
- Introducing `test_segmentation.py` to verify the functionality of the new segmentation feed.
- Refine existing UI and visual tests by updating locators and fixing indentation errors to accommodate the new segmentation feature and maintain test stability.
2025-12-11 17:52:08 +07:00
7d3dbc229d Merge branch 'Unity' into dev 2025-12-03 14:33:13 +07:00
15b6f3626d feat(web-ui): Implement custom numerical input with [-] [0] [+] buttons. Replaced default up/down adjustment buttons for numerical inputs in the Lamp Matrix Control with a custom [-] [0] [+] button layout, including HTML structure, CSS styling, and JavaScript logic. 2025-12-03 14:26:09 +07:00
df2f44857b fix(web-ui): Eliminate tiny viewport scroll by refining vertical spacing
Addressed the persistent "tiny bit" of viewport overflow reported by the user.

- **Removed `h1` margins:** Set `margin: 0;` on the `h1` element to prevent its
  margins from contributing to unexpected layout shifts.
- **Centralized vertical spacing on `body`:** Managed overall top/bottom spacing
  with `padding-top: 20px; padding-bottom: 20px;` on the `body`.
- **Introduced flex `gap` for vertical separation:** Used `gap: 20px;` on the `body`
  (as a flex container) to precisely control the spacing between the `h1` and
  `.main-container`.
- **Ensured correct box-sizing for `body`:** Explicitly set `box-sizing: border-box;`
  on `body` to include its padding within the `100vh` height calculation,
  guaranteeing exact fit.

These adjustments collectively ensure the entire interface fits perfectly within
the `100vh` viewport without any overflow.
2025-12-03 12:37:24 +07:00
5e7f874bfd feat(web-ui): Constrain camera feed section and Basler streams to viewport height
Addressed user feedback to prevent the camera feed section and its internal
streams from overflowing the viewport.

- **Explicitly constrained camera-view height:** Ensured `.camera-view` utilizes
  `height: 100%` and `overflow-y: auto` to fit within its parent (`.main-container`)
  and allow internal scrolling if content is too tall.
- **Refined individual stream container sizing:** Removed fixed `height: 100%`
  from `.camera-container-individual` and re-enabled `max-width: 100%`. This,
  combined with `aspect-ratio` and `max-height: 100%`, allows individual camera
  streams to scale correctly within their allocated grid cells without causing
  overflow.
- **Ensured grid row containment:** Applied `height: 100%` and `overflow: hidden`
  to `.camera-color-row` and `.camera-mono-row` to tightly constrain content
  within grid rows.
2025-12-03 12:29:23 +07:00
43019286cf fix(web-ui): Refine LMC spacing and ensure consistent control widths
Addressed user feedback regarding precise layout adjustments for the Lamp
Matrix Control (LMC) interface.

- **Removed all explicit padding:** Removed `padding` from `.main-container`
  and `padding-right` from `.lamp-view` to make content flush with browser edges
  as per new instructions.
- **Unified Control Widths:** Explicitly set `width: 470px` for `.region-control`,
  `.control-panel`, and `.center-lamp-control` to ensure they precisely match
  the calculated 470px width of the `.matrix-grid`. This creates visual
  consistency and horizontal alignment across all LMC components.
- **Centered LMC Components:** Ensured all LMC components are horizontally
  centered within the `.lamp-view` by setting `align-items: center` on
  `.lamp-view .main-content`.
2025-12-03 11:45:03 +07:00
b4793ca585 fix(web-ui): Correct Lamp Matrix visual and layout responsiveness
Resolved issues with the Lamp Matrix control being cropped and distorting,
and not maintaining a fixed 5x5 layout.

- **Fixed 5x5 Matrix:** Reverted `.matrix-grid` to `grid-template-columns: repeat(5, 70px)`
  to enforce a consistent 5x5 lamp layout.
- **Prevented Warping:** Removed `width: 100%` from `.matrix-grid` to allow its
  width to be intrinsically determined by its content, preventing distortion.
- **Ensured Adequate Space:** Set `.lamp-view` to `flex: 0 0 auto` with a
  `min-width: 480px` to guarantee sufficient space for the fixed matrix and
  controls without cropping.
- **Responsive Control Panels:** Applied `max-width: 470px` to `.control-panel`
  and `.center-lamp-control` to align their size with the matrix grid while
  maintaining responsiveness in smaller viewports.
- **Full Screen Utilization:** Ensured the overall application expands to fill
  the browser window by removing `max-width` from `.main-container`.
2025-12-03 10:55:11 +07:00
84e1e895ae feat(web-ui): Implement responsive camera stream layout and styling
This commit introduces a refined layout and styling for the camera streams
in the unified web UI. Key changes include:

- **Responsive Grid Layout:** Implemented a CSS Grid-based layout for camera feeds,
  ensuring the color camera occupies 1/3 height and mono cameras 2/3 height.
- **Improved Stream Fit:** Adjusted CSS to ensure individual camera streams
  (visual containers) perfectly fit within their designated borders without
  cropping, distortion, or excessive transparent space, addressing user feedback
  regarding "zoomed out" or ill-fitting streams.
- **Aesthetic Enhancements:** Removed black backgrounds from stream containers
  and applied corner radii for a modern look.
- **Padding Adjustments:** Optimized padding to prevent UI elements from appearing
  cramped while maintaining visual separation.
- **New Tests:** Added robust visual tests () to programmatically
  verify layout correctness and ensure tight fitting of camera feeds within
  their containers.
- **Dependency Updates:** Updated  to reflect any new or
  changed Python dependencies.
- **Test Runner & Gitignore:** Included  script and updated
   to properly ignore virtual environment artifacts.
2025-12-03 10:45:01 +07:00
c9c8cb7df7 revert: Revert controllerSoftware to commit 6a21816e42 2025-12-02 21:54:09 +07:00
97c7772a4c Revert "Changed color camera back to color, set all cameras to 20ms shutter speed"
This reverts commit 413590d1a2.
2025-12-02 07:57:20 +07:00
413590d1a2 Changed color camera back to color, set all cameras to 20ms shutter speed 2025-12-01 15:42:16 +07:00
35ddf9f844 3 cameras confirmed available 2025-12-01 15:25:57 +07:00
d12931641b Changed to adaptive camera count 2025-12-01 11:55:51 +07:00
4a46b12c05 Stable Dual Camera setup 2025-12-01 11:51:45 +07:00
d11288165b AVC encoding implemented, now optimizing 2025-12-01 10:11:10 +07:00
19fcdd6c9f Proper res, low framerate due to JPEG encoding 2025-12-01 09:59:22 +07:00
7af789a1d6 First camera feed success 2025-12-01 09:42:34 +07:00
da4f7073dc test python script 2025-12-01 09:08:39 +07:00
17d691173b Temporary Commit 2025-11-28 16:18:30 +07:00
Tempest
7d5b283dd3 Update .gitignore to exclude .DS_Store files 2025-11-28 11:01:55 +07:00
40b9b2c8d2 feat: Add pupil detection and camera stream to UI
- Add a new section to the web UI to display pupil detection data and a live camera stream with YOLO segmentation.
- Add a /video_feed endpoint to stream the annotated camera feed.
- Update the VisionSystem to support onnxruntime-gpu with a fallback to CPU.
- Add logging to indicate which backend is being used.
- Refactor the test suite to accommodate the new features and fix existing tests.
2025-11-28 08:29:17 +07:00
8aebeea6ee Resolve merge conflict in app.py and integrate vision system 2025-11-27 22:23:57 +07:00
60fa88926f Add new files and update existing ones 2025-11-27 22:22:56 +07:00
Tempest
6a21816e42 Ready for delivery 2025-10-04 19:11:03 +07:00
28 changed files with 3399 additions and 416 deletions

13
.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
# Virtual Environment
.venv/
init/
# Python cache
__pycache__/
*.pyc
# Test artifacts
app_stdout.log
app_stderr.log
screenshots/
.DS_Store

23
GEMINI.md Normal file
View File

@ -0,0 +1,23 @@
### Pupil Segmentation Integration
- **Objective:** Integrated Pupil segmentation into the mono camera pipelines.
- **Key Changes:**
- Modified `src/unified_web_ui/gstreamer_pipeline.py` to:
- Add a `tee` element for mono camera streams to split the video feed.
- Create a new branch for pupil segmentation with a `videoconvert` placeholder and a dedicated `appsink` (`seg_sink_{i}`).
- Implement `on_new_seg_sample_factory` callback to handle segmentation data.
- Added `seg_frame_buffers` and `seg_buffer_locks` for segmentation output.
- Introduced `get_seg_frame_by_id` to retrieve segmentation frames.
- Ensured unique naming for `tee` elements (`t_{i}`) in the GStreamer pipeline to prevent linking errors.
- Modified `src/unified_web_ui/app.py` to:
- Add a new Flask route `/segmentation_feed/<int:stream_id>` to serve the segmentation video stream.
- Added `datetime.utcnow` to the Jinja2 context for cache-busting in templates.
- Modified `src/unified_web_web_ui/templates/index.html` to:
- Include a new "Segmentation Feed" section displaying the segmentation video streams, sourcing from `/segmentation_feed/` with cache-busting timestamps.
- Updated existing video feeds (`video_feed`) with cache-busting timestamps for consistency.
- **Testing:**
- Created `tests/test_segmentation.py` to verify the segmentation feed is visible and updating.
- Updated `src/unified_web_ui/tests/test_ui.py` to refine locators (`#camera .camera-streams-grid .camera-container-individual`) for camera stream elements, resolving conflicts with segmentation feeds.
- Updated `src/unified_web_ui/tests/test_visual.py` to refine locators (`#camera .camera-mono-row`, `#camera .camera-color-row`, `#camera .camera-mono`) to prevent strict mode violations and ensure accurate targeting of camera layout elements.
- Fixed indentation errors in `src/unified_web_ui/tests/test_visual.py`.
- **Status:** All tests are passing, and the infrastructure for pupil segmentation is in place, awaiting the integration of a DeepStream model.

View File

@ -3,3 +3,26 @@
## Introduction
This repository houses programs and documents related to Pupilometer project by Vietnam Academy of Science and Technology. The project aims to introduce a benchmark and researches into the interaction between light intensity and temperature to the eye strain disorder.
## Dependencies
### Python Dependencies
The Python dependencies are listed in the `requirements.txt` file. You can install them using pip:
```bash
pip install -r requirements.txt
```
### NVIDIA DeepStream
For running the pupil segmentation on a Jetson Orin AGX or a Windows machine with an NVIDIA GPU, this project uses NVIDIA DeepStream. DeepStream is a complex dependency and cannot be installed via pip.
Please follow the official NVIDIA documentation to install DeepStream for your platform:
* **Jetson:** [DeepStream for Jetson](https://developer.nvidia.com/deepstream-sdk-jetson)
* **Windows:** [DeepStream for Windows](https://developer.nvidia.com/deepstream-sdk-windows)
You will also need to install GStreamer and the Python bindings (PyGObject). These are usually installed as part of the DeepStream installation.
Additionally, the `pyds` library, which provides Python bindings for DeepStream metadata structures, is required. This library is also included with the DeepStream SDK and may need to be installed manually.

View File

@ -1,2 +1,146 @@
bleak>="1.0.0"
flask>="3.1.1"
appdirs==1.4.4
apturl==0.5.2
async-timeout==5.0.1
attrs==21.2.0
bcrypt==3.2.0
beniget==0.4.1
bleak==2.0.0
blinker==1.9.0
Brlapi==0.8.3
Brotli==1.0.9
certifi==2020.6.20
chardet==4.0.0
charset-normalizer==3.4.4
click==8.3.1
colorama==0.4.4
coloredlogs==15.0.1
contourpy==1.3.2
cpuset==1.6
cryptography==3.4.8
cupshelpers==1.0
cycler==0.11.0
dbus-fast==3.1.2
dbus-python==1.2.18
decorator==4.4.2
defer==1.0.6
distro==1.7.0
distro-info==1.1+ubuntu0.2
duplicity==0.8.21
exceptiongroup==1.3.1
fasteners==0.14.1
filelock==3.20.0
Flask==3.1.2
flatbuffers==25.9.23
fonttools==4.29.1
fs==2.4.12
fsspec==2025.10.0
future==0.18.2
gast==0.5.2
greenlet==3.2.4
httplib2==0.20.2
humanfriendly==10.0
idna==3.3
importlib-metadata==4.6.4
iniconfig==2.3.0
itsdangerous==2.2.0
jeepney==0.7.1
Jetson.GPIO==2.1.7
Jinja2==3.1.6
keyring==23.5.0
kiwisolver==1.3.2
language-selector==0.1
launchpadlib==1.10.16
lazr.restfulclient==0.14.4
lazr.uri==1.0.6
lockfile==0.12.2
louis==3.20.0
lxml==4.8.0
lz4==3.1.3+dfsg
macaroonbakery==1.3.1
Mako==1.1.3
MarkupSafe==3.0.3
matplotlib==3.5.1
meson==1.9.1
ml_dtypes==0.5.4
monotonic==1.6
more-itertools==8.10.0
mpmath==1.3.0
networkx==3.4.2
ninja==1.13.0
numpy==2.2.6
oauthlib==3.2.0
olefile==0.46
onboard==1.4.1
onnx==1.20.0
onnxruntime==1.23.2
onnxslim==0.1.77
opencv-python==4.12.0.88
packaging==25.0
pandas==1.3.5
paramiko==2.9.3
pexpect==4.8.0
Pillow==9.0.1
playwright==1.56.0
pluggy==1.6.0
ply==3.11
polars==1.35.2
polars-runtime-32==1.35.2
protobuf==6.33.1
psutil==7.1.3
ptyprocess==0.7.0
pycairo==1.20.1
pycups==2.0.1
pyee==13.0.0
Pygments==2.19.2
PyGObject==3.42.1
PyJWT==2.3.0
pymacaroons==0.13.0
PyNaCl==1.5.0
PyOpenGL==3.1.5
pyparsing==2.4.7
pypylon==4.2.0
pyRFC3339==1.1
pyservicemaker @ file:///opt/nvidia/deepstream/deepstream-7.1/service-maker/python/pyservicemaker-0.0.1-py3-none-linux_aarch64.whl
pytest==9.0.1
pytest-base-url==2.1.0
pytest-playwright==0.7.2
python-apt==2.4.0+ubuntu4
python-dateutil==2.8.1
python-dbusmock==0.27.5
python-debian==0.1.43+ubuntu1.1
python-slugify==8.0.4
pythran==0.10.0
pytz==2022.1
pyxdg==0.27
PyYAML==6.0.3
requests==2.25.1
scipy==1.8.0
seaborn==0.13.2
SecretStorage==3.3.1
six==1.16.0
SQLAlchemy==2.0.44
sympy==1.14.0
systemd-python==234
text-unidecode==1.3
thop==0.1.1.post2209072238
tomli==2.3.0
torch==2.9.1
torchaudio==2.9.1
torchvision==0.24.1
tqdm==4.67.1
typing_extensions==4.15.0
ubuntu-advantage-tools==8001
ubuntu-drivers-common==0.0.0
ufoLib2==0.13.1
ultralytics==8.3.233
ultralytics-thop==2.0.18
unicodedata2==14.0.0
urllib3==1.26.5
urwid==2.1.2
uv==0.9.13
wadllib==1.3.6
websockets==15.0.1
Werkzeug==3.1.4
xdg==5
xkit==0.0.0
zipp==1.0.0

8
run.ps1 Normal file
View File

@ -0,0 +1,8 @@
# Activate the virtual environment
. .\.venv\Scripts\Activate.ps1
# Install dependencies
pip install -r requirements.txt
# Run the Flask application
python src/controllerSoftware/app.py

4
run.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
source .venv/bin/activate
pip install -r requirements.txt
python src/controllerSoftware/app.py

59
run_tests.sh Executable file
View File

@ -0,0 +1,59 @@
#!/bin/bash
# Start the Flask application in the background
python src/unified_web_ui/app.py &
APP_PID=$!
# Wait for the application to start
echo "Waiting for application to start..."
sleep 10
# Check if the application is running
if ! ps -p $APP_PID > /dev/null
then
echo "Application failed to start."
exit 1
fi
# Run the curl tests
echo "Running curl tests..."
http_code=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:5000/)
echo "Main page status code: $http_code"
if [ "$http_code" != "200" ]; then
echo "Main page test failed."
fi
http_code=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:5000/get_fps)
echo "get_fps status code: $http_code"
if [ "$http_code" != "200" ]; then
echo "get_fps test failed."
fi
matrix_data='{"matrix":['
for i in {1..5}; do
matrix_data+='['
for j in {1..5}; do
matrix_data+='{"ww":0,"cw":0,"blue":0}'
if [ $j -lt 5 ]; then
matrix_data+=','
fi
done
matrix_data+=']'
if [ $i -lt 5 ]; then
matrix_data+=','
fi
done
matrix_data+=']}'
http_code=$(curl -s -o /dev/null -w "%{http_code}" -X POST -H "Content-Type: application/json" -d "$matrix_data" http://localhost:5000/set_matrix)
echo "set_matrix status code: $http_code"
if [ "$http_code" != "200" ]; then
echo "set_matrix test failed."
fi
# Run the pytest tests
echo "Running pytest tests..."
pytest src/unified_web_ui/tests/
# Kill the Flask application
kill $APP_PID

View File

@ -14,7 +14,7 @@ import os
# Set to True to run without a physical BLE device for testing purposes.
# Set to False to connect to the actual lamp matrix.
DEBUG_MODE = True
DEBUG_MODE = False
# --- BLE Device Configuration (Ignored in DEBUG_MODE) ---
DEVICE_NAME = "Pupilometer LED Billboard"
@ -84,17 +84,17 @@ async def set_full_matrix_on_ble(colorSeries):
# =====================================================================
# SNIPPET TO PATCH SWAPPED LAMP POSITIONS
# =====================================================================
print("Patching lamp positions 3 <-> 7 and 12 <-> 24.")
#print("Patching lamp positions 3 <-> 7 and 12 <-> 24.")
# Swap data for lamps at positions 3 and 7
temp_color_3 = colorSeries[3]
colorSeries[3] = colorSeries[7]
colorSeries[7] = temp_color_3
#temp_color_3 = colorSeries[3]
#colorSeries[3] = colorSeries[7]
#colorSeries[7] = temp_color_3
# Swap data for lamps at positions 12 and 24
temp_color_12 = colorSeries[12]
colorSeries[12] = colorSeries[24]
colorSeries[24] = temp_color_12
#temp_color_12 = colorSeries[12]
#colorSeries[12] = colorSeries[24]
#colorSeries[24] = temp_color_12
# =====================================================================
if DEBUG_MODE:

View File

@ -0,0 +1,267 @@
import sys
import subprocess
import threading
import time
import gc
import json
from flask import Flask, Response, render_template_string, jsonify
# --- CONFIGURATION ---
TARGET_NUM_CAMS = 3
DEFAULT_W = 1280
DEFAULT_H = 720
# --- PART 1: DETECTION ---
def scan_connected_cameras():
print("--- Scanning for Basler Cameras ---")
detection_script = """
import sys
try:
from pypylon import pylon
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
print("NONE")
else:
serials = [d.GetSerialNumber() for d in devices]
cam = pylon.InstantCamera(tl_factory.CreateDevice(devices[0]))
cam.Open()
try:
cam.BinningHorizontal.Value = 2
cam.BinningVertical.Value = 2
w = cam.Width.GetValue()
h = cam.Height.GetValue()
cam.BinningHorizontal.Value = 1
cam.BinningVertical.Value = 1
supported = 1
except:
w = cam.Width.GetValue()
h = cam.Height.GetValue()
supported = 0
cam.Close()
print(f"{','.join(serials)}|{w}|{h}|{supported}")
except Exception:
print("NONE")
"""
try:
result = subprocess.run([sys.executable, "-c", detection_script], capture_output=True, text=True)
output = result.stdout.strip()
if "NONE" in output or not output:
return [], DEFAULT_W, DEFAULT_H, False
parts = output.split('|')
return parts[0].split(','), int(parts[1]), int(parts[2]), (parts[3] == '1')
except: return [], DEFAULT_W, DEFAULT_H, False
DETECTED_SERIALS, CAM_W, CAM_H, BINNING_SUPPORTED = scan_connected_cameras()
ACTUAL_CAMS_COUNT = len(DETECTED_SERIALS)
# --- RESOLUTION & LAYOUT ---
INTERNAL_WIDTH = 1280
if ACTUAL_CAMS_COUNT > 0:
scale = INTERNAL_WIDTH / CAM_W
INTERNAL_HEIGHT = int(CAM_H * scale)
else:
INTERNAL_HEIGHT = 720
if INTERNAL_HEIGHT % 2 != 0: INTERNAL_HEIGHT += 1
WEB_WIDTH = 1280
total_source_width = INTERNAL_WIDTH * TARGET_NUM_CAMS
scale_tiled = WEB_WIDTH / total_source_width
WEB_HEIGHT = int(INTERNAL_HEIGHT * scale_tiled)
if WEB_HEIGHT % 2 != 0: WEB_HEIGHT += 1
print(f"LAYOUT: {TARGET_NUM_CAMS} Slots | Detected: {ACTUAL_CAMS_COUNT} Cams")
# --- FLASK & GSTREAMER ---
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
app = Flask(__name__)
frame_buffer = None
buffer_lock = threading.Lock()
current_fps = 0.0
frame_count = 0
start_time = time.time()
class GStreamerPipeline(threading.Thread):
def __init__(self):
super().__init__()
self.loop = GLib.MainLoop()
self.pipeline = None
def run(self):
Gst.init(None)
self.build_pipeline()
self.pipeline.set_state(Gst.State.PLAYING)
try:
self.loop.run()
except Exception as e:
print(f"Error: {e}")
finally:
self.pipeline.set_state(Gst.State.NULL)
def on_new_sample(self, sink):
global frame_count, start_time, current_fps
sample = sink.emit("pull-sample")
if not sample: return Gst.FlowReturn.ERROR
frame_count += 1
# Calculate FPS every 30 frames
if frame_count % 30 == 0:
elapsed = time.time() - start_time
current_fps = 30 / elapsed if elapsed > 0 else 0
start_time = time.time()
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success: return Gst.FlowReturn.ERROR
global frame_buffer
with buffer_lock:
frame_buffer = bytes(map_info.data)
buffer.unmap(map_info)
return Gst.FlowReturn.OK
def build_pipeline(self):
# 1. CAMERA SETTINGS
# Note: We run cameras at 60 FPS for internal stability
cam_settings = (
"cam::TriggerMode=Off "
"cam::AcquisitionFrameRateEnable=true cam::AcquisitionFrameRate=60.0 "
"cam::ExposureAuto=Off "
"cam::ExposureTime=20000.0 "
"cam::GainAuto=Continuous "
"cam::DeviceLinkThroughputLimitMode=Off "
)
if BINNING_SUPPORTED:
cam_settings += "cam::BinningHorizontal=2 cam::BinningVertical=2 "
sources_str = ""
for i in range(TARGET_NUM_CAMS):
if i < len(DETECTED_SERIALS):
# --- REAL CAMERA SOURCE ---
serial = DETECTED_SERIALS[i]
print(f"Slot {i}: Linking Camera {serial}")
pre_scale = (
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12, width={INTERNAL_WIDTH}, height={INTERNAL_HEIGHT}, framerate=60/1 ! "
)
source = (
f"pylonsrc device-serial-number={serial} {cam_settings} ! "
"video/x-raw,format=GRAY8 ! "
"videoconvert ! "
"video/x-raw,format=I420 ! "
"nvvideoconvert compute-hw=1 ! "
"video/x-raw(memory:NVMM) ! "
f"{pre_scale}"
f"m.sink_{i} "
)
else:
# --- DISCONNECTED PLACEHOLDER ---
print(f"Slot {i}: Creating Placeholder (Synchronized)")
# FIX 1: Add 'videorate' to enforce strict timing on the fake source
# This prevents the placeholder from running too fast/slow and jittering the muxer
source = (
f"videotestsrc pattern=black is-live=true ! "
f"videorate ! " # <--- TIMING ENFORCER
f"video/x-raw,width={INTERNAL_WIDTH},height={INTERNAL_HEIGHT},format=I420,framerate=60/1 ! "
f"textoverlay text=\"DISCONNECTED\" valignment=center halignment=center font-desc=\"Sans, 48\" ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM),format=NV12,width={INTERNAL_WIDTH},height={INTERNAL_HEIGHT},framerate=60/1 ! "
f"m.sink_{i} "
)
sources_str += source
# 3. MUXER & PROCESSING
# FIX 2: batched-push-timeout=33000
# This tells the muxer: "If you have data, send it every 33ms (30fps). Don't wait forever."
# FIX 3: Output Videorate
# We process internally at 60fps (best for camera driver), but we DROP to 30fps
# for the web stream. This makes the network stream buttery smooth and consistent.
processing = (
f"nvstreammux name=m batch-size={TARGET_NUM_CAMS} width={INTERNAL_WIDTH} height={INTERNAL_HEIGHT} "
f"live-source=1 batched-push-timeout=33000 ! " # <--- TIMEOUT FIX
f"nvmultistreamtiler width={WEB_WIDTH} height={WEB_HEIGHT} rows=1 columns={TARGET_NUM_CAMS} ! "
"nvvideoconvert compute-hw=1 ! "
"video/x-raw(memory:NVMM) ! "
"videorate drop-only=true ! " # <--- DROPPING FRAMES CLEANLY
"video/x-raw(memory:NVMM), framerate=30/1 ! " # <--- Force 30 FPS Output
f"nvjpegenc quality=60 ! "
"appsink name=sink emit-signals=True sync=False max-buffers=1 drop=True"
)
pipeline_str = f"{sources_str} {processing}"
print(f"Launching SMOOTH Pipeline...")
self.pipeline = Gst.parse_launch(pipeline_str)
appsink = self.pipeline.get_by_name("sink")
appsink.connect("new-sample", self.on_new_sample)
# --- FLASK ---
@app.route('/')
def index():
return render_template_string('''
<html>
<head>
<style>
body { background-color: #111; color: white; text-align: center; font-family: monospace; margin: 0; padding: 20px; }
.container { position: relative; display: inline-block; border: 3px solid #4CAF50; }
img { display: block; max-width: 100%; height: auto; }
.hud {
position: absolute; top: 10px; left: 10px;
background: rgba(0, 0, 0, 0.6); color: #00FF00;
padding: 5px 10px; font-weight: bold; pointer-events: none;
}
</style>
</head>
<body>
<h1>Basler 3-Cam (Smooth)</h1>
<div class="container">
<div class="hud" id="fps-counter">FPS: --</div>
<img src="{{ url_for('video_feed') }}">
</div>
<script>
setInterval(function() {
fetch('/get_fps').then(r => r.json()).then(d => {
document.getElementById('fps-counter').innerText = "FPS: " + d.fps;
});
}, 500);
</script>
</body>
</html>
''')
@app.route('/video_feed')
def video_feed():
def generate():
count = 0
while True:
with buffer_lock:
if frame_buffer:
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame_buffer + b'\r\n')
# Sleep 33ms (30 FPS)
time.sleep(0.033)
count += 1
if count % 200 == 0: gc.collect()
return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/get_fps')
def get_fps():
return jsonify(fps=round(current_fps, 1))
if __name__ == "__main__":
subprocess.run([sys.executable, "-c", "import gc; gc.collect()"])
gst_thread = GStreamerPipeline()
gst_thread.daemon = True
gst_thread.start()
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)

View File

@ -0,0 +1,58 @@
from pypylon import pylon
import time
import sys
try:
# Get the Transport Layer Factory
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
print("No cameras found!")
sys.exit(1)
print(f"Found {len(devices)} cameras. Checking Camera 1...")
# Connect to first camera
cam = pylon.InstantCamera(tl_factory.CreateDevice(devices[0]))
cam.Open()
# 1. Reset to Defaults
print("Reseting to Defaults...")
cam.UserSetSelector.Value = "Default"
cam.UserSetLoad.Execute()
# 2. Enable Auto Exposure/Gain
print("Enabling Auto Exposure & Gain...")
cam.ExposureAuto.Value = "Continuous"
cam.GainAuto.Value = "Continuous"
# 3. Wait for it to settle (Camera adjusts to light)
print("Waiting 3 seconds for auto-adjustment...")
for i in range(3):
print(f"{3-i}...")
time.sleep(1)
# 4. READ VALUES
current_exposure = cam.ExposureTime.GetValue() # In Microseconds (us)
current_fps_readout = cam.ResultingFrameRate.GetValue()
print("-" * 30)
print(f"REPORT FOR SERIAL: {cam.GetDeviceInfo().GetSerialNumber()}")
print("-" * 30)
print(f"Current Exposure Time: {current_exposure:.1f} us ({current_exposure/1000:.1f} ms)")
print(f"Theoretical Max FPS: {1000000 / current_exposure:.1f} FPS")
print(f"Camera Internal FPS: {current_fps_readout:.1f} FPS")
print("-" * 30)
if current_exposure > 33000:
print("⚠️ PROBLEM FOUND: Exposure is > 33ms.")
print(" This physically prevents the camera from reaching 30 FPS.")
print(" Solution: Add more light or limit AutoExposureUpperLimit.")
else:
print("✅ Exposure looks fast enough for 30 FPS.")
cam.Close()
except Exception as e:
print(f"Error: {e}")

View File

@ -0,0 +1,33 @@
# Unified WebUI
This application combines the functionality of the `detectionSoftware` and `controllerSoftware` into a single, unified web interface.
## Features
- **Camera View:** Displays a tiled video stream from multiple Basler cameras.
- **Lamp Control:** Provides a web interface to control a 5x5 LED matrix via Bluetooth Low Energy (BLE).
- **Responsive UI:** The UI is designed to work on both desktop and mobile devices. On desktop, the lamp control and camera view are displayed side-by-side. On mobile, they are in separate tabs.
## Setup
1. **Install dependencies:**
```bash
pip install -r requirements.txt
```
2. **Run the application:**
```bash
python src/unified_web_ui/app.py
```
3. **Open the web interface:**
Open a web browser and navigate to `http://<your-ip-address>:5000`.
## Modules
- **`app.py`:** The main Flask application file.
- **`ble_controller.py`:** Handles the BLE communication with the lamp matrix.
- **`camera_scanner.py`:** Scans for connected Basler cameras.
- **`gstreamer_pipeline.py`:** Creates and manages the GStreamer pipeline for video processing.
- **`templates/index.html`:** The main HTML template for the web interface.
- **`static/style.css`:** The CSS file for styling the web interface.

226
src/unified_web_ui/app.py Normal file
View File

@ -0,0 +1,226 @@
import sys
import subprocess
import threading
import time
import asyncio
import json
import signal
import os
from flask import Flask, Response, render_template, request, jsonify, g
from camera_scanner import scan_connected_cameras
from gstreamer_pipeline import GStreamerPipeline
from ble_controller import BLEController, get_spiral_address, SPIRAL_MAP_5x5, lampAmount
# =================================================================================================
# APP CONFIGURATION
# =================================================================================================
# --- Camera Configuration ---
TARGET_NUM_CAMS = 3
DEFAULT_W = 1280
DEFAULT_H = 720
# --- BLE Device Configuration ---
DEVICE_NAME = "Pupilometer LED Billboard"
DEBUG_MODE = False # Set to True to run without a physical BLE device
# =================================================================================================
# INITIALIZATION
# =================================================================================================
# --- Camera Initialization ---
DETECTED_CAMS = scan_connected_cameras()
ACTUAL_CAMS_COUNT = len(DETECTED_CAMS)
# Sort cameras: color camera first, then mono cameras
# Assuming 'is_color' is a reliable flag
# If no color camera exists, the first mono will be at index 0.
detected_cams_sorted = sorted(DETECTED_CAMS, key=lambda x: x['is_color'], reverse=True)
if ACTUAL_CAMS_COUNT > 0:
MASTER_W = detected_cams_sorted[0]['width']
MASTER_H = detected_cams_sorted[0]['height']
else:
MASTER_W = DEFAULT_W
MASTER_H = DEFAULT_H
INTERNAL_WIDTH = 1280
scale = INTERNAL_WIDTH / MASTER_W
INTERNAL_HEIGHT = int(MASTER_H * scale)
if INTERNAL_HEIGHT % 2 != 0: INTERNAL_HEIGHT += 1
WEB_WIDTH = 1280
total_source_width = INTERNAL_WIDTH * TARGET_NUM_CAMS
scale_tiled = WEB_WIDTH / total_source_width
WEB_HEIGHT = int(INTERNAL_HEIGHT * scale_tiled)
if INTERNAL_HEIGHT % 2 != 0: INTERNAL_HEIGHT += 1 # Ensure even for some GStreamer elements
print(f"LAYOUT: {TARGET_NUM_CAMS} Slots | Detected: {ACTUAL_CAMS_COUNT}")
for c in detected_cams_sorted:
print(f" - Cam {c['serial']} ({c['model']}): {'COLOR' if c['is_color'] else 'MONO'}")
# --- Flask App Initialization ---
app = Flask(__name__)
# --- GStreamer Initialization ---
gst_thread = GStreamerPipeline(detected_cams_sorted, TARGET_NUM_CAMS, INTERNAL_WIDTH, INTERNAL_HEIGHT, WEB_WIDTH, WEB_HEIGHT)
gst_thread.daemon = True
gst_thread.start()
# --- BLE Initialization ---
ble_controller = BLEController(DEVICE_NAME, DEBUG_MODE)
ble_thread = None
if not DEBUG_MODE:
ble_controller.ble_event_loop = asyncio.new_event_loop()
ble_thread = threading.Thread(target=ble_controller.ble_event_loop.run_forever, daemon=True)
ble_thread.start()
future = asyncio.run_coroutine_threadsafe(ble_controller.connect(), ble_controller.ble_event_loop)
try:
future.result(timeout=10)
except Exception as e:
print(f"Failed to connect to BLE device: {e}")
# Optionally, set DEBUG_MODE to True here if BLE connection is critical
# DEBUG_MODE = True
# --- In-memory matrix for DEBUG_MODE ---
lamp_matrix = [['#000000' for _ in range(5)] for _ in range(5)]
# =================================================================================================
# COLOR MIXING
# =================================================================================================
def calculate_rgb(ww, cw, blue):
warm_white_r, warm_white_g, warm_white_b = 255, 192, 128
cool_white_r, cool_white_g, cool_white_b = 192, 224, 255
blue_r, blue_g, blue_b = 0, 0, 255
r = (ww / 255) * warm_white_r + (cw / 255) * cool_white_r + (blue / 255) * blue_r
g = (ww / 255) * warm_white_g + (cw / 255) * cool_white_g + (blue / 255) * blue_g
b = (ww / 255) * warm_white_b + (cw / 255) * cool_white_b + (blue / 255) * blue_b
r = int(min(255, round(r)))
g = int(min(255, round(g)))
b = int(min(255, round(b)))
return r, g, b
def rgb_to_hex(r, g, b):
r = int(max(0, min(255, r)))
g = int(max(0, min(255, g)))
b = int(max(0, min(255, b)))
return f'#{r:02x}{g:02x}{b:02x}'
# =================================================================================================
# FLASK ROUTES
# =================================================================================================
from datetime import datetime
@app.context_processor
def inject_now():
return {'now': datetime.utcnow}
@app.before_request
def before_request():
g.detected_cams_info = []
for cam in gst_thread.sorted_cams:
cam_copy = cam.copy()
if cam_copy['height'] > 0:
cam_copy['aspect_ratio'] = cam_copy['width'] / cam_copy['height']
else:
cam_copy['aspect_ratio'] = 16 / 9 # Default aspect ratio
g.detected_cams_info.append(cam_copy)
@app.route('/')
def index():
return render_template('index.html', matrix=lamp_matrix, detected_cams_info=g.detected_cams_info)
@app.route('/video_feed/<int:stream_id>')
def video_feed(stream_id):
def generate(stream_id):
while True:
frame = gst_thread.get_frame_by_id(stream_id)
if frame:
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
time.sleep(0.016) # Roughly 60 fps
return Response(generate(stream_id), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/segmentation_feed/<int:stream_id>')
def segmentation_feed(stream_id):
def generate(stream_id):
while True:
frame = gst_thread.get_seg_frame_by_id(stream_id)
if frame:
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
time.sleep(0.016) # Roughly 60 fps
return Response(generate(stream_id), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/get_fps')
def get_fps():
return jsonify(fps=gst_thread.get_fps())
@app.route('/set_matrix', methods=['POST'])
def set_matrix():
data = request.get_json()
full_matrix = data.get('matrix', [])
if not full_matrix or len(full_matrix) != 5 or len(full_matrix[0]) != 5:
return jsonify(success=False, message="Invalid matrix data received"), 400
serial_colors = [b'\x00\x00\x00'] * lampAmount
try:
for row in range(5):
for col in range(5):
lamp_data = full_matrix[row][col]
ww = int(lamp_data['ww'])
cw = int(lamp_data['cw'])
blue = int(lamp_data['blue'])
color_bytes = bytes([ww, cw, blue])
spiral_pos = get_spiral_address(row, col, SPIRAL_MAP_5x5)
if spiral_pos != -1:
serial_colors[spiral_pos] = color_bytes
lampColorR, lampColorG, lampColorB = calculate_rgb(ww,cw,blue)
lamp_matrix[row][col] = rgb_to_hex(lampColorR, lampColorG, lampColorB)
if DEBUG_MODE:
return jsonify(success=True)
else:
asyncio.run_coroutine_threadsafe(
ble_controller.set_full_matrix(serial_colors),
ble_controller.ble_event_loop
)
return jsonify(success=True)
except Exception as e:
print(f"Error in set_matrix route: {e}")
return jsonify(success=False, message=str(e)), 500
# =================================================================================================
# APP SHUTDOWN
# =================================================================================================
def signal_handler(signum, frame):
print("Received shutdown signal, gracefully shutting down...")
if not DEBUG_MODE:
disconnect_future = asyncio.run_coroutine_threadsafe(ble_controller.disconnect(), ble_controller.ble_event_loop)
try:
disconnect_future.result(timeout=5)
except Exception as e:
print(f"Error during BLE disconnect: {e}")
if not DEBUG_MODE and ble_controller.ble_event_loop and ble_controller.ble_event_loop.is_running():
ble_controller.ble_event_loop.call_soon_threadsafe(ble_controller.ble_event_loop.stop)
ble_thread.join(timeout=1)
os._exit(0)
# =================================================================================================
# APP STARTUP
# =================================================================================================
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True, use_reloader=False)

View File

@ -0,0 +1,108 @@
import asyncio
from bleak import BleakScanner, BleakClient
# =================================================================================================
# BLE HELPER FUNCTIONS (Used in LIVE mode)
# =================================================================================================
lampAmount = 25
def create_spiral_map(n=5):
if n % 2 == 0:
raise ValueError("Matrix size must be odd for a unique center point.")
spiral_map = [[0] * n for _ in range(n)]
r, c = n // 2, n // 2
address = 0
spiral_map[r][c] = address
dr = [-1, 0, 1, 0]
dc = [0, 1, 0, -1]
direction = 0
segment_length = 1
steps = 0
while address < n * n - 1:
for _ in range(segment_length):
address += 1
r += dr[direction]
c += dc[direction]
if 0 <= r < n and 0 <= c < n:
spiral_map[r][c] = address
direction = (direction + 1) % 4
steps += 1
if steps % 2 == 0:
segment_length += 1
return spiral_map
def get_spiral_address(row, col, spiral_map):
n = len(spiral_map)
if 0 <= row < n and 0 <= col < n:
return spiral_map[row][col]
else:
return -1
SPIRAL_MAP_5x5 = create_spiral_map(5)
class BLEController:
def __init__(self, device_name, debug_mode=False):
self.device_name = device_name
self.debug_mode = debug_mode
self.ble_client = None
self.ble_characteristics = None
self.ble_event_loop = None
async def connect(self):
print(f"Scanning for device: {self.device_name}...")
devices = await BleakScanner.discover()
target_device = next((d for d in devices if d.name == self.device_name), None)
if not target_device:
print(f"Device '{self.device_name}' not found.")
return False
print(f"Found device: {target_device.name} ({target_device.address})")
try:
self.ble_client = BleakClient(target_device.address)
await self.ble_client.connect()
if self.ble_client.is_connected:
print(f"Connected to {target_device.name}")
services = [service for service in self.ble_client.services if service.handle != 1]
characteristics = [
char for service in services for char in service.characteristics
]
self.ble_characteristics = sorted(characteristics, key=lambda char: char.handle)
print(f"Found {len(self.ble_characteristics)} characteristics for lamps.")
return True
else:
print(f"Failed to connect to {target_device.name}")
return False
except Exception as e:
print(f"An error occurred during BLE connection: {e}")
return False
async def disconnect(self):
if self.ble_client and self.ble_client.is_connected:
await self.ble_client.disconnect()
print("BLE client disconnected.")
async def set_full_matrix(self, color_series):
if not self.ble_client or not self.ble_client.is_connected:
print("BLE client not connected. Attempting to reconnect...")
await self.connect()
if not self.ble_client or not self.ble_client.is_connected:
print("Failed to reconnect to BLE client.")
return
if self.debug_mode:
print(f"Constructed the following matrix data: {color_series}")
for i, char in enumerate(self.ble_characteristics):
value_to_write = color_series[i]
print(f"Setting Lamp {i} ({char.uuid}) to {value_to_write.hex()}")
await self.ble_client.write_gatt_char(char.uuid, value_to_write)
else:
value_to_write = b"".join([color for color in color_series])
print(f"Setting lamps to {value_to_write.hex()}")
await self.ble_client.write_gatt_char(self.ble_characteristics[0].uuid, value_to_write)

View File

@ -0,0 +1,51 @@
import sys
import subprocess
def scan_connected_cameras():
print("--- Scanning for Basler Cameras ---")
detection_script = """
import sys
try:
from pypylon import pylon
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
print("NONE")
else:
results = []
for i in range(len(devices)):
cam = pylon.InstantCamera(tl_factory.CreateDevice(devices[i]))
cam.Open()
serial = cam.GetDeviceInfo().GetSerialNumber()
model = cam.GetDeviceInfo().GetModelName()
is_color = model.endswith("c") or "Color" in model
w = cam.Width.GetValue()
h = cam.Height.GetValue()
binning = 0
try:
cam.BinningHorizontal.Value = 2
cam.BinningVertical.Value = 2
cam.BinningHorizontal.Value = 1
cam.BinningVertical.Value = 1
binning = 1
except: pass
current_fmt = cam.PixelFormat.GetValue()
cam.Close()
results.append(f"{serial}:{w}:{h}:{binning}:{1 if is_color else 0}:{model}:{current_fmt}")
print("|".join(results))
except Exception: print("NONE")
"""
try:
result = subprocess.run([sys.executable, "-c", detection_script], capture_output=True, text=True)
output = result.stdout.strip()
if "NONE" in output or not output: return []
camera_list = []
entries = output.split('|')
for entry in entries:
parts = entry.split(':')
camera_list.append({
"serial": parts[0], "width": int(parts[1]), "height": int(parts[2]),
"binning": (parts[3] == '1'), "is_color": (parts[4] == '1'), "model": parts[5]
})
return camera_list
except: return []

View File

@ -0,0 +1,195 @@
import threading
import time
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib, GObject
class GStreamerPipeline(threading.Thread):
def __init__(self, detected_cams, target_num_cams, internal_width, internal_height, web_width, web_height):
super().__init__()
self.loop = GLib.MainLoop()
self.pipeline = None
self.target_num_cams = target_num_cams
self.internal_width = internal_width
self.internal_height = internal_height
self.web_width = web_width
self.web_height = web_height
self.frame_buffers = [None] * self.target_num_cams
self.buffer_locks = [threading.Lock() for _ in range(self.target_num_cams)]
self.seg_frame_buffers = [None] * self.target_num_cams
self.seg_buffer_locks = [threading.Lock() for _ in range(self.target_num_cams)]
self.current_fps = 0.0 # Will still report overall FPS, not per stream
self.frame_count = 0
self.start_time = time.time()
# Sort cameras: color camera first, then mono cameras
self.sorted_cams = detected_cams # We now expect detected_cams to be already sorted in app.py or be handled by the client
print(f"Sorted cameras for GStreamer: {self.sorted_cams}")
def run(self):
Gst.init(None)
self.build_pipeline()
if self.pipeline:
self.pipeline.set_state(Gst.State.PLAYING)
try:
self.loop.run()
except Exception as e:
print(f"Error: {e}")
finally:
self.pipeline.set_state(Gst.State.NULL)
else:
print("GStreamer pipeline failed to build.")
def on_new_seg_sample_factory(self, stream_id):
def on_new_sample(sink):
sample = sink.emit("pull-sample")
if not sample: return Gst.FlowReturn.ERROR
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success: return Gst.FlowReturn.ERROR
with self.seg_buffer_locks[stream_id]:
self.seg_frame_buffers[stream_id] = bytes(map_info.data)
buffer.unmap(map_info)
return Gst.FlowReturn.OK
return on_new_sample
def on_new_sample_factory(self, stream_id):
def on_new_sample(sink):
sample = sink.emit("pull-sample")
if not sample: return Gst.FlowReturn.ERROR
# Update overall FPS counter from the first stream
if stream_id == 0:
self.frame_count += 1
if self.frame_count % 30 == 0:
elapsed = time.time() - self.start_time
self.current_fps = 30 / float(elapsed) if elapsed > 0 else 0
self.start_time = time.time()
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success: return Gst.FlowReturn.ERROR
with self.buffer_locks[stream_id]:
self.frame_buffers[stream_id] = bytes(map_info.data)
buffer.unmap(map_info)
return Gst.FlowReturn.OK
return on_new_sample
def build_pipeline(self):
sources_and_sinks_str = []
for i in range(self.target_num_cams):
if i < len(self.sorted_cams):
cam_info = self.sorted_cams[i]
serial = cam_info['serial']
is_color = cam_info['is_color']
print(f"Setting up pipeline for Stream {i}: {serial} [{'Color' if is_color else 'Mono'}]")
base_settings = f"pylonsrc device-serial-number={serial} " \
"cam::TriggerMode=Off " \
"cam::AcquisitionFrameRateEnable=true cam::AcquisitionFrameRate=60.0 " \
"cam::DeviceLinkThroughputLimitMode=Off "
if is_color:
color_settings = f"{base_settings} " \
"cam::ExposureAuto=Off cam::ExposureTime=20000.0 " \
"cam::GainAuto=Continuous " \
"cam::Width=1920 cam::Height=1080 " \
"cam::PixelFormat=BayerBG8 "
source_and_sink = (
f"{color_settings} ! "
"bayer2rgb ! " # Debayer
"videoconvert ! "
"video/x-raw,format=RGBA ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12, width={self.internal_width}, height={self.internal_height}, framerate=60/1 ! "
f"nvjpegenc quality=60 ! "
f"appsink name=sink_{i} emit-signals=True sync=False max-buffers=1 drop=True"
)
else:
mono_settings = f"{base_settings} " \
"cam::ExposureAuto=Off cam::ExposureTime=20000.0 " \
"cam::GainAuto=Continuous "
if cam_info['binning']:
mono_settings += "cam::BinningHorizontal=2 cam::BinningVertical=2 "
source_and_sink = (
f"{mono_settings} ! "
"video/x-raw,format=GRAY8 ! "
"videoconvert ! "
f"tee name=t_{i} ! "
"queue ! "
"video/x-raw,format=I420 ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12, width={self.internal_width}, height={self.internal_height}, framerate=60/1 ! "
f"nvjpegenc quality=60 ! "
f"appsink name=sink_{i} emit-signals=True sync=False max-buffers=1 drop=True "
f"t_{i}. ! queue ! "
"videoconvert ! " # Placeholder for DeepStream
f"appsink name=seg_sink_{i} emit-signals=True sync=False max-buffers=1 drop=True"
)
else:
# Placeholder for disconnected cameras
source_and_sink = (
"videotestsrc pattern=black is-live=true ! "
f"videorate ! "
f"video/x-raw,width={self.internal_width},height={self.internal_height},format=I420,framerate=60/1 ! "
f"textoverlay text=\"DISCONNECTED\" valignment=center halignment=center font-desc=\"Sans, 48\" ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM),format=NV12,width={self.internal_width},height={self.internal_height},framerate=60/1 ! "
f"nvjpegenc quality=60 ! "
f"appsink name=sink_{i} emit-signals=True sync=False max-buffers=1 drop=True"
)
sources_and_sinks_str.append(source_and_sink)
pipeline_str = " ".join(sources_and_sinks_str)
print("\n--- GStreamer Pipeline String ---")
print(pipeline_str)
print("---------------------------------\n")
self.pipeline = Gst.parse_launch(pipeline_str)
if self.pipeline is None:
print("ERROR: GStreamer pipeline failed to parse. Check pipeline string for errors.")
return
for i in range(self.target_num_cams):
appsink = self.pipeline.get_by_name(f"sink_{i}")
if appsink:
# Set caps on appsink to ensure it's negotiating JPEG
appsink.set_property("caps", Gst.Caps.from_string("image/jpeg,width=(int)[1, 2147483647],height=(int)[1, 2147483647]"))
appsink.connect("new-sample", self.on_new_sample_factory(i))
else:
print(f"Error: appsink_{i} not found in pipeline.")
segsink = self.pipeline.get_by_name(f"seg_sink_{i}")
if segsink:
segsink.connect("new-sample", self.on_new_seg_sample_factory(i))
def get_frame_by_id(self, stream_id):
if 0 <= stream_id < self.target_num_cams:
with self.buffer_locks[stream_id]:
return self.frame_buffers[stream_id]
return None
def get_seg_frame_by_id(self, stream_id):
if 0 <= stream_id < self.target_num_cams:
with self.seg_buffer_locks[stream_id]:
return self.seg_frame_buffers[stream_id]
return None
def get_fps(self):
return round(self.current_fps, 1)

301
src/unified_web_ui/run.py Normal file
View File

@ -0,0 +1,301 @@
import sys
import subprocess
import threading
import time
import gc
import json
from flask import Flask, Response, render_template_string, jsonify
# --- CONFIGURATION ---
TARGET_NUM_CAMS = 3
DEFAULT_W = 1280
DEFAULT_H = 720
# --- PART 1: DETECTION (Unchanged) ---
def scan_connected_cameras():
print("--- Scanning for Basler Cameras ---")
detection_script = """
import sys
try:
from pypylon import pylon
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
print("NONE")
else:
results = []
for i in range(len(devices)):
cam = pylon.InstantCamera(tl_factory.CreateDevice(devices[i]))
cam.Open()
serial = cam.GetDeviceInfo().GetSerialNumber()
model = cam.GetDeviceInfo().GetModelName()
is_color = model.endswith("c") or "Color" in model
w = cam.Width.GetValue()
h = cam.Height.GetValue()
binning = 0
try:
cam.BinningHorizontal.Value = 2
cam.BinningVertical.Value = 2
cam.BinningHorizontal.Value = 1
cam.BinningVertical.Value = 1
binning = 1
except: pass
current_fmt = cam.PixelFormat.GetValue()
cam.Close()
results.append(f"{serial}:{w}:{h}:{binning}:{1 if is_color else 0}:{model}:{current_fmt}")
print("|".join(results))
except Exception: print("NONE")
"""
try:
result = subprocess.run([sys.executable, "-c", detection_script], capture_output=True, text=True)
output = result.stdout.strip()
if "NONE" in output or not output: return []
camera_list = []
entries = output.split('|')
for entry in entries:
parts = entry.split(':')
camera_list.append({
"serial": parts[0], "width": int(parts[1]), "height": int(parts[2]),
"binning": (parts[3] == '1'), "is_color": (parts[4] == '1'), "model": parts[5]
})
return camera_list
except: return []
DETECTED_CAMS = scan_connected_cameras()
ACTUAL_CAMS_COUNT = len(DETECTED_CAMS)
# --- RESOLUTION LOGIC ---
if ACTUAL_CAMS_COUNT > 0:
MASTER_W = DETECTED_CAMS[0]['width']
MASTER_H = DETECTED_CAMS[0]['height']
else:
MASTER_W = DEFAULT_W
MASTER_H = DEFAULT_H
INTERNAL_WIDTH = 1280
scale = INTERNAL_WIDTH / MASTER_W
INTERNAL_HEIGHT = int(MASTER_H * scale)
if INTERNAL_HEIGHT % 2 != 0: INTERNAL_HEIGHT += 1
WEB_WIDTH = 1280
total_source_width = INTERNAL_WIDTH * TARGET_NUM_CAMS
scale_tiled = WEB_WIDTH / total_source_width
WEB_HEIGHT = int(INTERNAL_HEIGHT * scale_tiled)
if WEB_HEIGHT % 2 != 0: WEB_HEIGHT += 1
print(f"LAYOUT: {TARGET_NUM_CAMS} Slots | Detected: {ACTUAL_CAMS_COUNT}")
for c in DETECTED_CAMS:
print(f" - Cam {c['serial']} ({c['model']}): {'COLOR' if c['is_color'] else 'MONO'}")
# --- FLASK & GSTREAMER ---
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
app = Flask(__name__)
frame_buffer = None
buffer_lock = threading.Lock()
current_fps = 0.0
frame_count = 0
start_time = time.time()
class GStreamerPipeline(threading.Thread):
def __init__(self):
super().__init__()
self.loop = GLib.MainLoop()
self.pipeline = None
def run(self):
Gst.init(None)
self.build_pipeline()
self.pipeline.set_state(Gst.State.PLAYING)
try:
self.loop.run()
except Exception as e:
print(f"Error: {e}")
finally:
self.pipeline.set_state(Gst.State.NULL)
def on_new_sample(self, sink):
global frame_count, start_time, current_fps
sample = sink.emit("pull-sample")
if not sample: return Gst.FlowReturn.ERROR
frame_count += 1
if frame_count % 30 == 0:
elapsed = time.time() - start_time
current_fps = 30 / elapsed if elapsed > 0 else 0
start_time = time.time()
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success: return Gst.FlowReturn.ERROR
global frame_buffer
with buffer_lock:
frame_buffer = bytes(map_info.data)
buffer.unmap(map_info)
return Gst.FlowReturn.OK
def build_pipeline(self):
sources_str = ""
for i in range(TARGET_NUM_CAMS):
if i < len(DETECTED_CAMS):
cam_info = DETECTED_CAMS[i]
serial = cam_info['serial']
is_color = cam_info['is_color']
print(f"Slot {i}: Linking {serial} [{'Color' if is_color else 'Mono'}]")
# --- 1. BASE SETTINGS (Common) ---
# We DISABLE Throughput Limit to allow high bandwidth
base_settings = (
f"pylonsrc device-serial-number={serial} "
"cam::TriggerMode=Off "
"cam::AcquisitionFrameRateEnable=true cam::AcquisitionFrameRate=60.0 "
"cam::DeviceLinkThroughputLimitMode=Off "
)
# Pre-scaler
pre_scale = (
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12, width={INTERNAL_WIDTH}, height={INTERNAL_HEIGHT}, framerate=60/1 ! "
)
if is_color:
# --- 2A. COLOR SETTINGS (High Speed) ---
# FIX: Force ExposureTime=20000.0 (20ms) even for Color.
# If we leave it on Auto, it will slow down the Mono cameras.
# We rely on 'GainAuto' to make the image bright enough.
color_settings = (
f"{base_settings} "
"cam::ExposureAuto=Off cam::ExposureTime=20000.0 "
"cam::GainAuto=Continuous "
"cam::Width=1920 cam::Height=1080 cam::OffsetX=336 cam::OffsetY=484 "
"cam::PixelFormat=BayerBG8 " # Force Format
)
source = (
f"{color_settings} ! "
"bayer2rgb ! " # Debayer
"videoconvert ! "
"video/x-raw,format=RGBA ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12 ! "
f"{pre_scale}"
f"m.sink_{i} "
)
else:
# --- 2B. MONO SETTINGS (High Speed) ---
# Force ExposureTime=20000.0
mono_settings = (
f"{base_settings} "
"cam::ExposureAuto=Off cam::ExposureTime=20000.0 "
"cam::GainAuto=Continuous "
)
if cam_info['binning']:
mono_settings += "cam::BinningHorizontal=2 cam::BinningVertical=2 "
source = (
f"{mono_settings} ! "
"video/x-raw,format=GRAY8 ! "
"videoconvert ! "
"video/x-raw,format=I420 ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM), format=NV12 ! "
f"{pre_scale}"
f"m.sink_{i} "
)
else:
# --- DISCONNECTED PLACEHOLDER ---
source = (
f"videotestsrc pattern=black is-live=true ! "
f"videorate ! "
f"video/x-raw,width={INTERNAL_WIDTH},height={INTERNAL_HEIGHT},format=I420,framerate=60/1 ! "
f"textoverlay text=\"DISCONNECTED\" valignment=center halignment=center font-desc=\"Sans, 48\" ! "
"nvvideoconvert compute-hw=1 ! "
f"video/x-raw(memory:NVMM),format=NV12,width={INTERNAL_WIDTH},height={INTERNAL_HEIGHT},framerate=60/1 ! "
f"m.sink_{i} "
)
sources_str += source
# 3. MUXER & PROCESSING
processing = (
f"nvstreammux name=m batch-size={TARGET_NUM_CAMS} width={INTERNAL_WIDTH} height={INTERNAL_HEIGHT} "
f"live-source=1 batched-push-timeout=33000 ! "
f"nvmultistreamtiler width={WEB_WIDTH} height={WEB_HEIGHT} rows=1 columns={TARGET_NUM_CAMS} ! "
"nvvideoconvert compute-hw=1 ! "
"video/x-raw(memory:NVMM) ! "
"videorate drop-only=true ! "
"video/x-raw(memory:NVMM), framerate=30/1 ! "
f"nvjpegenc quality=60 ! "
"appsink name=sink emit-signals=True sync=False max-buffers=1 drop=True"
)
pipeline_str = f"{sources_str} {processing}"
print(f"Launching Optimized Pipeline (All Cams Forced to 20ms Shutter)...")
self.pipeline = Gst.parse_launch(pipeline_str)
appsink = self.pipeline.get_by_name("sink")
appsink.connect("new-sample", self.on_new_sample)
# --- FLASK ---
@app.route('/')
def index():
return render_template_string('''
<html>
<head>
<style>
body { background-color: #111; color: white; text-align: center; font-family: monospace; margin: 0; padding: 20px; }
.container { position: relative; display: inline-block; border: 3px solid #4CAF50; }
img { display: block; max-width: 100%; height: auto; }
.hud {
position: absolute; top: 10px; left: 10px;
background: rgba(0, 0, 0, 0.6); color: #00FF00;
padding: 5px 10px; font-weight: bold; pointer-events: none;
}
</style>
</head>
<body>
<h1>Basler Final Feed</h1>
<div class="container">
<div class="hud" id="fps-counter">FPS: --</div>
<img src="{{ url_for('video_feed') }}">
</div>
<script>
setInterval(function() {
fetch('/get_fps').then(r => r.json()).then(d => {
document.getElementById('fps-counter').innerText = "FPS: " + d.fps;
});
}, 500);
</script>
</body>
</html>
''')
@app.route('/video_feed')
def video_feed():
def generate():
count = 0
while True:
with buffer_lock:
if frame_buffer:
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame_buffer + b'\r\n')
time.sleep(0.016)
count += 1
if count % 200 == 0: gc.collect()
return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/get_fps')
def get_fps():
return jsonify(fps=round(current_fps, 1))
if __name__ == "__main__":
subprocess.run([sys.executable, "-c", "import gc; gc.collect()"])
gst_thread = GStreamerPipeline()
gst_thread.daemon = True
gst_thread.start()
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)

View File

@ -0,0 +1,436 @@
body {
background-color: #1a1a1a; /* Darker gray */
color: #ffffff;
font-family: Arial, sans-serif; /* Reverted to original font */
margin: 0;
padding-top: 20px; /* Added padding to top for overall spacing */
padding-bottom: 20px; /* Added padding to bottom for overall spacing */
box-sizing: border-box; /* Ensure padding is included in height */
display: flex; /* Changed to flex */
flex-direction: column; /* Set flex direction to column */
height: 100vh; /* Make body fill viewport height */
gap: 20px; /* Added gap between flex items (h1 and main-container) */
}
h1 {
color: #64ffda; /* Kept existing color */
text-align: center;
margin: 0; /* Removed explicit margins */
}
.main-container {
display: flex; /* Desktop default */
flex-direction: row;
flex-grow: 1; /* Make main-container fill remaining vertical space */
width: 100%;
/* Removed max-width to allow full screen utilization */
margin: 0 auto;
/* Removed height: calc(100vh - 80px); */
/* Removed padding: 20px; */
box-sizing: border-box; /* Ensure padding is included in element's total width and height */
gap: 20px; /* Added spacing between the two main sections */
}
/* Tabs are hidden by default on desktop, dynamically added for mobile */
.tabs {
display: none;
}
.content-section {
display: block; /* Desktop default */
padding: 5px; /* Reduced padding further */
overflow-y: auto;
}
/* --- Lamp View (Original styles adapted to dark theme) --- */
.lamp-view {
flex: 0 0 auto; /* Allow content to determine width, do not shrink */
/* Removed min-width as padding will affect total width */
padding-left: 2vw; /* Added 2vw padding on the left side */
padding-right: 2vw; /* Added 2vw padding on the right side */
border-right: 1px solid #333; /* Reintroduced the line separating the sections */
display: flex;
flex-direction: column;
align-items: center;
overflow-y: auto; /* Added to allow vertical scrolling if its content is too tall */
}
.lamp-view .container { /* Added for original styling effect */
display: flex;
flex-direction: column;
align-items: center;
position: relative;
width: 100%;
}
.lamp-view .main-content { /* Added for original styling effect */
display: flex;
flex-direction: column; /* Changed to column to stack matrix and controls vertically */
align-items: center; /* Changed to center to horizontally center its children */
gap: 20px; /* Adjusted gap for vertical stacking */
flex-wrap: wrap; /* Allow wrapping for responsiveness - not strictly needed for column but kept for safety */
justify-content: center; /* This will center the column within the lamp-view if its width allows */
width: 100%; /* Ensure main-content fills lamp-view's width */
}
.matrix-grid {
display: grid;
grid-template-columns: repeat(5, 70px); /* Fixed 5-column grid */
grid-template-rows: repeat(5, 70px);
gap: 20px;
padding: 20px;
background-color: #333;
border-radius: 10px;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.2);
margin-bottom: 20px; /* Kept margin-bottom for spacing below grid */
/* Removed width: 100%; to let grid determine its own width */
box-sizing: border-box; /* Account for padding */
}
.lamp {
width: 70px;
height: 70px;
border-radius: 10%; /* Reverted to original square with rounded corners */
background-color: #000;
transition: box-shadow 0.2s, transform 0.1s;
cursor: pointer;
border: 2px solid transparent;
}
.lamp.on {
box-shadow: 0 0 15px currentColor, 0 0 25px currentColor;
}
.lamp.selected {
border: 2px solid #fff;
transform: scale(1.1);
}
.region-control {
margin-bottom: 20px; /* Kept margin-bottom for spacing below region-control */
/* Removed text-align: center; as parent's align-items will handle centering */
width: 470px; /* Explicitly set width to match matrix grid */
box-sizing: border-box; /* Ensure padding/border included in width */
}
.region-control select {
padding: 10px 15px;
font-size: 14px;
cursor: pointer;
border: 1px solid #64ffda; /* Adapted to theme */
border-radius: 5px;
background-color: #333; /* Adapted to theme */
color: #ffffff;
width: 100%; /* Fill parent's width */
box-sizing: border-box; /* Include padding in width */
}
.control-panel, .center-lamp-control {
background-color: #444; /* Adapted to theme */
padding: 20px;
border-radius: 10px;
width: 470px; /* Explicitly set width to match matrix grid */
margin-bottom: 20px; /* Kept margin-bottom for spacing below control panel */
box-sizing: border-box; /* Account for padding */
}
.control-panel.inactive-control {
background-color: #333;
filter: saturate(0.2);
}
.control-panel.inactive-control .slider-row {
pointer-events: none;
}
.control-panel h2, .center-lamp-control h2 {
color: #64ffda; /* Adapted to theme */
font-size: 16px;
margin-bottom: 10px;
text-align: center;
}
.slider-group {
width: 100%;
display: flex;
flex-direction: column;
gap: 5px;
}
.slider-row {
display: grid;
grid-template-columns: 150px 1fr 50px; /* Adjusted last column for number input buttons */
gap: 10px;
align-items: center;
}
.slider-group input[type="range"] {
-webkit-appearance: none;
height: 8px;
border-radius: 5px;
outline: none;
cursor: pointer;
background: #555; /* Adapted to theme */
}
.slider-group input[type="number"] {
-webkit-appearance: none; /* Hide default spinner for Chrome, Safari */
-moz-appearance: textfield; /* Hide default spinner for Firefox */
text-align: center; /* Center the number */
width: auto; /* Allow flex-grow to manage width */
font-size: 14px;
border: none; /* Will be part of the new control's border */
border-radius: 0; /* No radius on its own if part of a group */
padding: 5px;
background-color: #333; /* Adapted to theme */
color: #ffffff;
}
/* Specifically hide number input spinner buttons */
.slider-group input[type="number"]::-webkit-inner-spin-button,
.slider-group input[type="number"]::-webkit-outer-spin-button {
-webkit-appearance: none;
margin: 0;
}
.slider-group input[type="range"]::-webkit-slider-thumb {
-webkit-appearance: none;
height: 20px;
width: 20px;
border-radius: 50%;
background: #64ffda; /* Adapted to theme */
cursor: pointer;
box-shadow: 0 0 5px rgba(0,0,0,0.5);
margin-top: 2px;
}
.slider-group input[type="range"]::-webkit-slider-runnable-track {
height: 24px;
border-radius: 12px;
}
input.white-3000k::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #ffc080); }
input.white-6500k::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #c0e0ff); }
input.blue::-webkit-slider-runnable-track { background: linear-gradient(to right, #000, #00f); }
.slider-label {
color: #ffffff; /* Adapted to theme */
font-size: 14px;
text-align: left;
white-space: nowrap;
width: 120px;
}
.inactive-control .slider-label {
color: #888;
}
/* --- New styles for number input controls --- */
.number-input-controls {
display: flex;
align-items: stretch; /* Stretch children to fill container height */
gap: 2px; /* Small gap between buttons and input */
flex-shrink: 0; /* Prevent the control group from shrinking in the grid */
}
.number-input-controls input[type="number"] {
flex-grow: 1; /* Make it fill available space */
text-align: center;
border: 1px solid #64ffda; /* Border for the number input */
border-radius: 5px;
background-color: #333;
color: #ffffff;
min-width: 40px; /* Ensure it doesn't get too small */
}
.number-input-controls button {
width: 30px; /* Fixed width */
background-color: #64ffda; /* Accent color */
color: #1a1a1a; /* Dark text */
border: none;
border-radius: 5px;
font-size: 16px;
font-weight: bold;
cursor: pointer;
transition: background-color 0.2s;
display: flex; /* Center content */
justify-content: center;
align-items: center;
line-height: 1; /* Prevent extra height from line-height */
padding: 0; /* Remove default button padding */
}
.number-input-controls button:hover {
background-color: #4ed8bd; /* Lighter accent on hover */
}
.number-input-controls button:active {
background-color: #3cb89f; /* Darker accent on click */
}
/* Adjust slider-row grid to accommodate new number input controls */
.slider-row {
grid-template-columns: 150px 1fr 100px; /* Label, Range, NumberInputGroup(approx 30+30+2+40=102px) */
}
/* --- Camera View (Individual streams) --- */
.camera-view {
flex: 1; /* Allow it to grow and shrink to fill available space */
height: 100%; /* Added to make it fill the height of its parent */
overflow-y: auto; /* Added to allow vertical scrolling if content exceeds height */
/* Removed width: 75%; */
display: flex;
flex-direction: column;
align-items: center;
justify-content: flex-start; /* Align items to start for title */
position: relative;
gap: 10px; /* Space between elements */
}
.camera-streams-grid {
display: grid; /* Use CSS Grid */
/* Removed width: 100%; */
/* Removed height: 100%; */
flex-grow: 1; /* Allow it to grow to fill available space */
grid-template-rows: 1fr 2fr; /* 1/3 for color, 2/3 for monos */
grid-template-columns: 1fr; /* Single column for the main layout */
gap: 10px;
padding: 0 5px; /* Reduced horizontal padding */
}
.camera-color-row {
grid-row: 1;
grid-column: 1;
display: flex;
justify-content: center;
align-items: center;
overflow: hidden; /* Ensure content is clipped */
height: 100%; /* Explicitly set height to fill grid cell */
}
.camera-mono-row {
grid-row: 2;
grid-column: 1;
display: grid;
grid-template-columns: 1fr 1fr; /* Two columns for the mono cameras */
gap: 10px;
overflow: hidden; /* Ensure content is clipped */
height: 100%; /* Explicitly set height to fill grid cell */
}
.camera-container-individual {
position: relative;
border: 1px solid #333;
display: flex; /* Changed to flex for centering image */
justify-content: center;
align-items: center;
background-color: transparent;
aspect-ratio: var(--aspect-ratio); /* Keep aspect-ratio on container */
max-width: 100%; /* Re-added max-width */
/* Removed height: 100%; */
max-height: 100%; /* Ensure it doesn't exceed the boundaries of its parent */
overflow: hidden; /* Ensure image fits and is clipped if necessary */
box-sizing: border-box; /* Include padding and border in the element's total width and height */
border-radius: 10px; /* Added corner radius */
}
.camera-stream-individual {
max-width: 100%;
max-height: 100%;
object-fit: contain;
border-radius: 10px; /* Added corner radius to the image itself */
}
.camera-label {
position: absolute;
bottom: 5px;
left: 5px;
background: rgba(0, 0, 0, 0.6);
color: #fff;
padding: 3px 6px;
font-size: 12px;
border-radius: 3px;
}
.hud {
position: absolute; /* Kept existing position for FPS counter */
top: 10px;
right: 10px; /* Moved to right for better placement in new layout */
background: rgba(0, 0, 0, 0.6);
color: #00FF00;
padding: 5px 10px;
font-weight: bold;
pointer-events: none;
}
/* --- Responsive Design --- */
@media (max-width: 768px) {
.main-container {
flex-direction: column;
height: auto;
max-width: 100%;
}
.tabs {
display: flex; /* Show tabs on mobile */
justify-content: space-around;
background-color: #333;
padding: 10px 0;
}
.tab-link {
background-color: #333;
color: #ffffff;
border: none;
padding: 10px 15px;
cursor: pointer;
transition: background-color 0.3s;
}
.tab-link.active {
background-color: #64ffda;
color: #1a1a1a;
}
.lamp-view, .camera-view {
width: 100%;
border: none;
}
.content-section {
display: none; /* Hide tab content by default on mobile */
}
.content-section.active {
display: block; /* Show active tab content on mobile */
}
.lamp-view .main-content {
flex-direction: column;
align-items: center;
}
.control-panel, .center-lamp-control {
width: 100%;
max-width: none;
}
.camera-streams-grid {
/* On mobile, stack cameras */
grid-template-rows: auto; /* Revert to auto rows */
grid-template-columns: 1fr; /* Single column */
padding: 0;
}
.camera-color-row, .camera-mono-row {
grid-row: auto;
grid-column: auto;
display: flex; /* Change mono-row to flex for stacking vertically on mobile */
flex-direction: column;
gap: 10px;
}
.camera-container-individual {
width: 100%;
height: auto; /* Let aspect-ratio define height */
}
}

View File

@ -0,0 +1,423 @@
<!DOCTYPE html>
<html>
<head>
<title>Pupilometer Unified Control</title>
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
</head>
<body>
<h1>Pupilometer Unified Control</h1>
<div class="main-container">
<!-- The content sections will be populated based on the view -->
<div id="lamp" class="content-section lamp-view">
<!-- Lamp Control UI goes here -->
<div class="container">
<h2>Lamp Matrix Control</h2>
<div class="region-control">
<label for="region-select">Select Region:</label>
<select id="region-select">
<option value="" disabled selected>-- Select a region --</option>
<option value="Upper">Upper</option>
<option value="Lower">Lower</option>
<option value="Left">Left</option>
<option value="Right">Right</option>
<option value="Inner ring">Inner ring</option>
<option value="Outer ring">Outer ring</option>
<option value="All">All</option>
</select>
</div>
<div class="main-content">
<div class="matrix-grid">
{% for row in range(5) %}
{% for col in range(5) %}
<div class="lamp" data-row="{{ row }}" data-col="{{ col }}" style="background-color: {{ matrix[row][col] }};"></div>
{% endfor %}
{% endfor %}
</div>
<div class="slider-controls">
<div class="center-lamp-control">
<h2>Center Lamp</h2>
<div class="slider-group center-slider-group">
<div class="slider-row">
<span class="slider-label">Warm White (3000K)</span>
<input type="range" id="center-ww-slider" min="0" max="255" value="0" class="white-3000k">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="center-ww-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
<div class="slider-row">
<span class="slider-label">Cool White (6500K)</span>
<input type="range" id="center-cw-slider" min="0" max="255" value="0" class="white-6500k">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="center-cw-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
<div class="slider-row">
<span class="slider-label">Blue</span>
<input type="range" id="center-blue-slider" min="0" max="255" value="0" class="blue">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="center-blue-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
</div>
</div>
<div class="control-panel">
<h2>Selected Region</h2>
<div class="slider-group region-slider-group">
<div class="slider-row">
<span class="slider-label">Warm White (3000K)</span>
<input type="range" id="ww-slider" min="0" max="255" value="0" class="white-3000k">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="ww-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
<div class="slider-row">
<span class="slider-label">Cool White (6500K)</span>
<input type="range" id="cw-slider" min="0" max="255" value="0" class="white-6500k">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="cw-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
<div class="slider-row">
<span class="slider-label">Blue</span>
<input type="range" id="blue-slider" min="0" max="255" value="0" class="blue">
<div class="number-input-controls">
<button type="button" class="decrement-btn">-</button>
<input type="number" id="blue-number" min="0" max="255" value="0">
<button type="button" class="increment-btn">+</button>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
<div id="camera" class="content-section camera-view">
<h2>Basler Final Feed</h2>
<div class="camera-streams-grid">
<div class="camera-color-row">
{% for cam_index in range(detected_cams_info|length) %}
{% set cam_info = detected_cams_info[cam_index] %}
{% if cam_info.is_color %}
<div class="camera-container-individual {% if cam_info.is_color %}camera-color{% else %}camera-mono{% endif %}" style="--aspect-ratio: {{ cam_info.aspect_ratio }};">
<img src="{{ url_for('video_feed', stream_id=cam_index) }}?t={{ now().timestamp() }}" class="camera-stream-individual">
<div class="camera-label">{{ cam_info.model }} ({{ 'Color' if cam_info.is_color else 'Mono' }})</div>
</div>
{% endif %}
{% endfor %}
</div>
<div class="camera-mono-row">
{% for cam_index in range(detected_cams_info|length) %}
{% set cam_info = detected_cams_info[cam_index] %}
{% if not cam_info.is_color %}
<div class="camera-container-individual {% if cam_info.is_color %}camera-color{% else %}camera-mono{% endif %}" style="--aspect-ratio: {{ cam_info.aspect_ratio }};">
<img src="{{ url_for('video_feed', stream_id=cam_index) }}?t={{ now().timestamp() }}" class="camera-stream-individual">
<div class="camera-label">{{ cam_info.model }} ({{ 'Color' if cam_info.is_color else 'Mono' }})</div>
</div>
{% endif %}
{% endfor %}
</div>
</div>
<div class="hud" id="fps-counter">FPS: --</div>
</div>
<div id="segmentation" class="content-section camera-view">
<h2>Segmentation Feed</h2>
<div class="camera-streams-grid">
<div class="camera-mono-row">
{% for cam_index in range(detected_cams_info|length) %}
{% set cam_info = detected_cams_info[cam_index] %}
{% if not cam_info.is_color %}
<div class="camera-container-individual camera-mono" style="--aspect-ratio: {{ cam_info.aspect_ratio }};">
<img src="{{ url_for('segmentation_feed', stream_id=cam_index) }}?t={{ now().timestamp() }}" class="camera-stream-individual" id="segmentation-feed-{{- cam_index -}}">
<div class="camera-label">{{ cam_info.model }} (Segmentation)</div>
</div>
{% endif %}
{% endfor %}
</div>
</div>
</div>
</div>
<script>
// FPS counter
setInterval(function() {
fetch('/get_fps').then(r => r.json()).then(d => {
document.getElementById('fps-counter').innerText = "FPS: " + d.fps;
});
}, 500);
// State for the entire 5x5 matrix, storing {ww, cw, blue} for each lamp
var lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0}));
var selectedLamps = [];
// Function to calculate a visual RGB color from the three light values using a proper additive model
function calculateRgb(ww, cw, blue) {
const warmWhiteR = 255, warmWhiteG = 192, warmWhiteB = 128;
const coolWhiteR = 192, coolWhiteG = 224, coolWhiteB = 255;
const blueR = 0, blueG = 0, blueB = 255;
var r = (ww / 255) * warmWhiteR + (cw / 255) * coolWhiteR + (blue / 255) * blueR;
var g = (ww / 255) * warmWhiteG + (cw / 255) * coolWhiteR + (blue / 255) * blueG;
var b = (ww / 255) * warmWhiteB + (cw / 255) * coolWhiteB + (blue / 255) * blueB;
r = Math.min(255, Math.round(r));
g = Math.min(255, Math.round(g));
b = Math.min(255, Math.round(b));
var toHex = (c) => ('0' + c.toString(16)).slice(-2);
return '#' + toHex(r) + toHex(g) + toHex(b);
}
function updateLampUI(lamp, colorState) {
var newColor = calculateRgb(colorState.ww, colorState.cw, colorState.blue);
var lampElement = $(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`);
lampElement.css('background-color', newColor);
if (newColor === '#000000') {
lampElement.removeClass('on');
lampElement.css('box-shadow', `inset 0 0 5px rgba(0,0,0,0.5)`);
} else {
lampElement.addClass('on');
lampElement.css('box-shadow', `0 0 15px ${newColor}, 0 0 25px ${newColor}`);
}
}
function sendFullMatrixUpdate(lampsToUpdate, isRegionUpdate = false) {
var fullMatrixData = lampMatrixState.map(row => row.map(lamp => ({
ww: lamp.ww,
cw: lamp.cw,
blue: lamp.blue
})));
$.ajax({
url: '/set_matrix',
type: 'POST',
contentType: 'application/json',
data: JSON.stringify({ matrix: fullMatrixData }),
success: function(response) {
if (response.success) {
if (isRegionUpdate) {
for (var r = 0; r < 5; r++) {
for (var c = 0; c < 5; c++) {
updateLampUI({row: r, col: c}, lampMatrixState[r][c]);
}
}
} else {
lampsToUpdate.forEach(function(lamp) {
updateLampUI(lamp, lampMatrixState[lamp.row][lamp.col]);
});
}
}
}
});
}
function updateSliders(ww, cw, blue, prefix = '') {
$(`#${prefix}ww-slider`).val(ww);
$(`#${prefix}cw-slider`).val(cw);
$(`#${prefix}blue-slider`).val(blue);
$(`#${prefix}ww-number`).val(ww);
$(`#${prefix}cw-number`).val(cw);
$(`#${prefix}blue-number`).val(blue);
}
$(document).ready(function() {
var regionMaps = {
'Upper': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
],
'Lower': [
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'Left': [
{row: 0, col: 0}, {row: 1, col: 0}, {row: 2, col: 0}, {row: 3, col: 0}, {row: 4, col: 0},
{row: 0, col: 1}, {row: 1, col: 1}, {row: 2, col: 1}, {row: 3, col: 1}, {row: 4, col: 1},
],
'Right': [
{row: 0, col: 3}, {row: 1, col: 3}, {row: 2, col: 3}, {row: 3, col: 3}, {row: 4, col: 3},
{row: 0, col: 4}, {row: 1, col: 4}, {row: 2, col: 4}, {row: 3, col: 4}, {row: 4, col: 4},
],
'Inner ring': [
{row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3},
{row: 2, col: 1}, {row: 2, col: 3},
{row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}
],
'Outer ring': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
],
'All': [
{row: 0, col: 0}, {row: 0, col: 1}, {row: 0, col: 2}, {row: 0, col: 3}, {row: 0, col: 4},
{row: 1, col: 0}, {row: 1, col: 1}, {row: 1, col: 2}, {row: 1, col: 3}, {row: 1, col: 4},
{row: 2, col: 0}, {row: 2, col: 1}, {row: 2, col: 3}, {row: 2, col: 4},
{row: 3, col: 0}, {row: 3, col: 1}, {row: 3, col: 2}, {row: 3, col: 3}, {row: 3, col: 4},
{row: 4, col: 0}, {row: 4, col: 1}, {row: 4, col: 2}, {row: 4, col: 3}, {row: 4, col: 4},
]
};
var allRegionWithoutCenter = regionMaps['All'].filter(lamp => !(lamp.row === 2 && lamp.col === 2));
regionMaps['All'] = allRegionWithoutCenter;
$('.lamp').each(function() {
var row = $(this).data('row');
var col = $(this).data('col');
var color = $(this).css('background-color');
var rgb = color.match(/\d+/g);
lampMatrixState[row][col] = {
ww: rgb[0], cw: rgb[1], blue: rgb[2]
};
});
$('#region-select').on('change', function() {
var region = $(this).val();
if (region) {
$('.control-panel').removeClass('inactive-control');
} else {
$('.control-panel').addClass('inactive-control');
}
var newlySelectedLamps = regionMaps[region];
$('.lamp').removeClass('selected');
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
var lampsToUpdate = [];
var centerLampState = lampMatrixState[2][2];
lampMatrixState = Array(5).fill(null).map(() => Array(5).fill({ww: 0, cw: 0, blue: 0}));
lampMatrixState[2][2] = centerLampState;
selectedLamps = newlySelectedLamps;
selectedLamps.forEach(function(lamp) {
$(`.lamp[data-row="${lamp.row}"][data-col="${lamp.col}"]`).addClass('selected');
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
});
if (selectedLamps.length > 0) {
var firstLamp = selectedLamps[0];
var firstLampState = lampMatrixState[firstLamp.row][firstLamp.col];
updateSliders(firstLampState.ww, firstLampState.cw, firstLampState.blue, '');
}
sendFullMatrixUpdate(lampsToUpdate, true);
});
$('.region-slider-group input').on('input', function() {
if (selectedLamps.length === 0) return;
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#ww-slider').val());
var cw = parseInt($('#cw-slider').val());
var blue = parseInt($('#blue-slider').val());
var lampsToUpdate = [];
selectedLamps.forEach(function(lamp) {
lampMatrixState[lamp.row][lamp.col] = {ww: ww, cw: cw, blue: blue};
lampsToUpdate.push(lamp);
});
sendFullMatrixUpdate(lampsToUpdate);
});
$('.center-slider-group input').on('input', function() {
var target = $(this);
var originalVal = target.val();
var value = parseInt(originalVal, 10);
if (isNaN(value) || value < 0) { value = 0; }
if (value > 255) { value = 255; }
if (target.is('[type="number"]') && value.toString() !== originalVal) {
target.val(value);
}
var id = target.attr('id');
if (target.is('[type="range"]')) {
$(`#${id.replace('-slider', '-number')}`).val(value);
} else if (target.is('[type="number"]')) {
$(`#${id.replace('-number', '-slider')}`).val(value);
}
var ww = parseInt($('#center-ww-slider').val());
var cw = parseInt($('#center-cw-slider').val());
var blue = parseInt($('#center-blue-slider').val());
var centerLamp = {row: 2, col: 2};
lampMatrixState[centerLamp.row][centerLamp.col] = {ww: ww, cw: cw, blue: blue};
sendFullMatrixUpdate([centerLamp]);
});
// Handle increment/decrement buttons
$('.number-input-controls button').on('click', function() {
var btn = $(this);
var numberInput = btn.siblings('input[type="number"]');
var currentVal = parseInt(numberInput.val());
var min = parseInt(numberInput.attr('min'));
var max = parseInt(numberInput.attr('max'));
if (btn.hasClass('decrement-btn')) {
currentVal = Math.max(min, currentVal - 1);
} else if (btn.hasClass('increment-btn')) {
currentVal = Math.min(max, currentVal + 1);
}
numberInput.val(currentVal);
// Trigger the 'input' event to propagate the change to the slider and matrix update logic
numberInput.trigger('input');
});
if (!$('#region-select').val()) {
$('.control-panel').addClass('inactive-control');
}
// Mobile tab handling
if (window.innerWidth <= 768) {
// Dynamically add tab buttons
const tabsDiv = $('<div class="tabs"></div>');
tabsDiv.append('<button class="tab-link" data-tab="camera">Camera</button>');
tabsDiv.append('<button class="tab-link" data-tab="lamp">Lamp Control</button>');
// Prepend tabsDiv to .main-container
$('.main-container').prepend(tabsDiv);
// Hide all content sections initially
$('.content-section').hide();
// Show the camera section by default
$('#camera').show();
// Make the Camera tab active
$('.tab-link[data-tab="camera"]').addClass('active');
// Add click handlers for tab buttons
$('.tab-link').on('click', function() {
$('.tab-link').removeClass('active');
$(this).addClass('active');
$('.content-section').hide();
$(`#${$(this).data('tab')}`).show();
});
}
});
</script>
</body>
</html>

View File

@ -0,0 +1,58 @@
from pypylon import pylon
import time
import sys
try:
# Get the Transport Layer Factory
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
print("No cameras found!")
sys.exit(1)
print(f"Found {len(devices)} cameras. Checking Camera 1...")
# Connect to first camera
cam = pylon.InstantCamera(tl_factory.CreateDevice(devices[0]))
cam.Open()
# 1. Reset to Defaults
print("Reseting to Defaults...")
cam.UserSetSelector.Value = "Default"
cam.UserSetLoad.Execute()
# 2. Enable Auto Exposure/Gain
print("Enabling Auto Exposure & Gain...")
cam.ExposureAuto.Value = "Continuous"
cam.GainAuto.Value = "Continuous"
# 3. Wait for it to settle (Camera adjusts to light)
print("Waiting 3 seconds for auto-adjustment...")
for i in range(3):
print(f"{3-i}...")
time.sleep(1)
# 4. READ VALUES
current_exposure = cam.ExposureTime.GetValue() # In Microseconds (us)
current_fps_readout = cam.ResultingFrameRate.GetValue()
print("-" * 30)
print(f"REPORT FOR SERIAL: {cam.GetDeviceInfo().GetSerialNumber()}")
print("-" * 30)
print(f"Current Exposure Time: {current_exposure:.1f} us ({current_exposure/1000:.1f} ms)")
print(f"Theoretical Max FPS: {1000000 / current_exposure:.1f} FPS")
print(f"Camera Internal FPS: {current_fps_readout:.1f} FPS")
print("-" * 30)
if current_exposure > 33000:
print("⚠️ PROBLEM FOUND: Exposure is > 33ms.")
print(" This physically prevents the camera from reaching 30 FPS.")
print(" Solution: Add more light or limit AutoExposureUpperLimit.")
else:
print("✅ Exposure looks fast enough for 30 FPS.")
cam.Close()
except Exception as e:
print(f"Error: {e}")

View File

@ -0,0 +1,16 @@
#!/bin/bash
# Test the main page
echo "Testing main page..."
curl -s -o /dev/null -w "%{http_code}" http://localhost:5000/
echo ""
# Test the get_fps endpoint
echo "Testing get_fps endpoint..."
curl -s -o /dev/null -w "%{http_code}" http://localhost:5000/get_fps
echo ""
# Test the set_matrix endpoint
echo "Testing set_matrix endpoint..."
curl -s -o /dev/null -w "%{http_code}" -X POST -H "Content-Type: application/json" -d '{"matrix": [[{"ww":0,"cw":0,"blue":0}]]}' http://localhost:5000/set_matrix
echo ""

View File

@ -0,0 +1,52 @@
import re
from playwright.sync_api import Page, expect
def test_ui_elements_mobile(page: Page):
page.set_viewport_size({"width": 375, "height": 667})
page.goto("http://localhost:5000/")
# Check for main title
expect(page).to_have_title("Pupilometer Unified Control")
# Wait for dynamically added tabs to be attached to the DOM
page.wait_for_selector(".tabs", state="attached")
# Check for dynamically added tabs visibility on mobile
expect(page.locator(".tabs")).to_be_visible()
expect(page.locator(".tab-link[data-tab='camera']")).to_be_visible()
expect(page.locator(".tab-link[data-tab='lamp']")).to_be_visible()
# Check for camera view content
expect(page.locator("#camera h2")).to_contain_text("Basler Final Feed")
expect(page.locator("#fps-counter")).to_be_visible()
expect(page.locator("#camera .camera-streams-grid .camera-container-individual")).to_have_count(3)
expect(page.locator(".camera-streams-grid .camera-label").first).to_be_visible()
# Check for lamp view content
page.locator(".tab-link[data-tab='lamp']").click()
expect(page.locator("#lamp .container > h2")).to_contain_text("Lamp Matrix Control")
expect(page.locator("#region-select")).to_be_visible()
expect(page.locator(".center-lamp-control h2")).to_contain_text("Center Lamp")
expect(page.locator(".control-panel h2")).to_contain_text("Selected Region")
def test_ui_elements_desktop(page: Page):
page.set_viewport_size({"width": 1280, "height": 720})
page.goto("http://localhost:5000/")
# Check for main title
expect(page).to_have_title("Pupilometer Unified Control")
# Check that tabs are NOT visible on desktop
expect(page.locator(".tabs")).not_to_be_visible()
# Check for camera view content
expect(page.locator("#camera h2")).to_contain_text("Basler Final Feed")
expect(page.locator("#fps-counter")).to_be_visible()
expect(page.locator("#camera .camera-streams-grid .camera-container-individual")).to_have_count(3)
expect(page.locator(".camera-streams-grid .camera-label").first).to_be_visible()
# Check for lamp view content
expect(page.locator("#lamp .container > h2")).to_contain_text("Lamp Matrix Control")
expect(page.locator("#region-select")).to_be_visible()
expect(page.locator(".center-lamp-control h2")).to_contain_text("Center Lamp")
expect(page.locator(".control-panel h2")).to_contain_text("Selected Region")

View File

@ -0,0 +1,126 @@
import re
from playwright.sync_api import Page, expect
def test_visual_regression_desktop(page: Page):
page.set_viewport_size({"width": 1280, "height": 720})
page.goto("http://localhost:5000/")
page.screenshot(path="src/unified_web_ui/tests/screenshots/screenshot_desktop.png")
def test_visual_regression_tablet(page: Page):
page.set_viewport_size({"width": 768, "height": 1024}) # Common tablet size
page.goto("http://localhost:5000/")
page.screenshot(path="src/unified_web_ui/tests/screenshots/screenshot_tablet.png")
def test_visual_regression_mobile(page: Page):
page.set_viewport_size({"width": 375, "height": 667})
page.goto("http://localhost:5000/")
page.screenshot(path="src/unified_web_ui/tests/screenshots/screenshot_mobile.png")
def test_camera_layout_dimensions(page: Page):
page.set_viewport_size({"width": 1280, "height": 720})
page.goto("http://localhost:5000/")
# Wait for camera streams to load
page.wait_for_selector('img[src*="video_feed"]')
# Get bounding boxes for the key layout elements
camera_streams_grid_box = page.locator('#camera .camera-streams-grid').bounding_box()
color_camera_row_box = page.locator('#camera .camera-color-row').bounding_box()
mono_camera_row_box = page.locator('#camera .camera-mono-row').bounding_box()
assert camera_streams_grid_box is not None, "Camera streams grid not found"
assert color_camera_row_box is not None, "Color camera row not found"
assert mono_camera_row_box is not None, "Mono camera row not found"
# Define a small tolerance for floating point comparisons
tolerance = 7 # pixels, increased slightly for robust testing across browsers/OS
# 1. Check vertical positioning and 1/3, 2/3 height distribution
# The grid's 1fr 2fr distribution applies to the space *after* accounting for gaps.
grid_internal_gap_height = 10 # Defined in .camera-streams-grid gap property
total_distributable_height = camera_streams_grid_box['height'] - grid_internal_gap_height
expected_color_row_height = total_distributable_height / 3
expected_mono_row_height = total_distributable_height * 2 / 3
assert abs(color_camera_row_box['height'] - expected_color_row_height) < tolerance, \
f"Color camera row height is {color_camera_row_box['height']}, expected {expected_color_row_height} (1/3 of distributable height)"
assert abs(mono_camera_row_box['height'] - expected_mono_row_height) < tolerance, \
f"Mono camera row height is {mono_camera_row_box['height']}, expected {expected_mono_row_height} (2/3 of distributable height)"
# Check vertical stacking - top of mono row should be roughly at bottom of color row + gap
assert abs(mono_camera_row_box['y'] - (color_camera_row_box['y'] + color_camera_row_box['height'] + grid_internal_gap_height)) < tolerance, \
"Mono camera row is not positioned correctly below the color camera row with the expected gap."
# 2. Check horizontal padding (5px on each side of .camera-streams-grid)
grid_left_edge = camera_streams_grid_box['x']
grid_right_edge = camera_streams_grid_box['x'] + camera_streams_grid_box['width']
color_row_left_edge = color_camera_row_box['x']
color_row_right_edge = color_camera_row_box['x'] + color_camera_row_box['width']
mono_row_left_edge = mono_camera_row_box['x']
mono_row_right_edge = mono_camera_row_box['x'] + mono_camera_row_box['width']
# The content rows should align with the grid's padding
assert abs(color_row_left_edge - (grid_left_edge + 5)) < tolerance, \
f"Color camera row left edge is {color_row_left_edge}, expected {grid_left_edge + 5} (grid left + 5px padding)"
assert abs(grid_right_edge - color_row_right_edge - 5) < tolerance, \
f"Color camera row right edge is {color_row_right_edge}, expected {grid_right_edge - 5} (grid right - 5px padding)"
assert abs(mono_row_left_edge - (grid_left_edge + 5)) < tolerance, \
f"Mono camera row left edge is {mono_row_left_edge}, expected {grid_left_edge + 5} (grid left + 5px padding)"
assert abs(grid_right_edge - mono_row_right_edge - 5) < tolerance, \
f"Mono camera row right edge is {mono_row_right_edge}, expected {grid_right_edge - 5} (grid right - 5px padding)"
# 3. Verify no "behind" effect - check if mono camera row box's top is below color camera row's bottom
# This is implicitly covered by the vertical stacking check, but can be explicit for clarity
assert mono_camera_row_box['y'] > color_camera_row_box['y'] + color_camera_row_box['height'], \
"Mono camera row is visually overlapping the color camera row."
# 4. Check that individual camera containers tightly wrap their images
color_cam_container = page.locator('.camera-color-row .camera-container-individual')
color_cam_img = color_cam_container.locator('.camera-stream-individual')
if color_cam_container.count() > 0:
color_container_box = color_cam_container.bounding_box()
color_img_box = color_cam_img.bounding_box()
assert color_container_box is not None, "Color camera container not found for image fit check"
assert color_img_box is not None, "Color camera image not found for image fit check"
assert abs(color_container_box['width'] - color_img_box['width']) < tolerance, \
f"Color camera container width ({color_container_box['width']}) does not match image width ({color_img_box['width']})"
assert abs(color_container_box['height'] - color_img_box['height']) < tolerance, \
f"Color camera container height ({color_container_box['height']}) does not match image height ({color_img_box['height']})"
mono_cam_containers = page.locator('#camera .camera-mono-row .camera-container-individual').all()
for i, mono_cam_container in enumerate(mono_cam_containers):
mono_cam_img = mono_cam_container.locator('.camera-stream-individual')
mono_container_box = mono_cam_container.bounding_box()
mono_img_box = mono_cam_img.bounding_box()
assert mono_container_box is not None, f"Mono camera container {i} not found for image fit check"
assert mono_img_box is not None, f"Mono camera image {i} not found for image fit check"
assert abs(mono_container_box['width'] - mono_img_box['width']) < tolerance, \
f"Mono camera container {i} width ({mono_container_box['width']}) does not match image width ({mono_img_box['width']})"
assert abs(mono_container_box['height'] - mono_img_box['height']) < tolerance, \
f"Mono camera container {i} height ({mono_container_box['height']}) does not match image height ({mono_img_box['height']})"
# Optionally, check that individual mono cameras are side-by-side within their row
mono_cams = page.locator('#camera .camera-mono').all()
assert len(mono_cams) == 2, "Expected two mono cameras"
if len(mono_cams) == 2:
mono_cam_1_box = mono_cams[0].bounding_box()
mono_cam_2_box = mono_cams[1].bounding_box()
assert mono_cam_1_box is not None and mono_cam_2_box is not None, "Mono camera boxes not found"
# Check horizontal alignment
assert abs(mono_cam_1_box['y'] - mono_cam_2_box['y']) < tolerance, \
"Mono cameras are not horizontally aligned."
# Check side-by-side positioning (cam 2 should be to the right of cam 1)
assert mono_cam_2_box['x'] > mono_cam_1_box['x'] + mono_cam_1_box['width'] - tolerance, \
"Mono cameras are not side-by-side as expected."

21
tests/conftest.py Normal file
View File

@ -0,0 +1,21 @@
import pytest
from pypylon import pylon
@pytest.fixture(scope="session")
def camera_available():
"""
Pytest fixture that checks for a connected Basler camera.
If no camera is found, it skips the tests that depend on this fixture.
"""
try:
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
pytest.skip("No Basler camera found. Skipping tests that require a camera.")
# You can also add a photo capture test here if you want
# For now, just detecting the camera is enough
except Exception as e:
pytest.fail(f"An error occurred during camera detection: {e}")

View File

@ -0,0 +1,52 @@
import pytest
from pypylon import pylon
import cv2
@pytest.mark.usefixtures("camera_available")
def test_capture_photo():
"""
Tests that a photo can be captured from the Basler camera.
This test depends on the `camera_available` fixture in conftest.py.
"""
try:
# Get the transport layer factory.
tl_factory = pylon.TlFactory.GetInstance()
# Get all attached devices and exit application if no device is found.
devices = tl_factory.EnumerateDevices()
# Only grab from the first camera found
camera = pylon.InstantCamera(tl_factory.CreateDevice(devices[0]))
camera.Open()
# Max number of images to grab
countOfImagesToGrab = 1
# Create an image format converter
converter = pylon.ImageFormatConverter()
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
# Start grabbing continuously
camera.StartGrabbingMax(countOfImagesToGrab)
img = None
while camera.IsGrabbing():
grabResult = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grabResult.GrabSucceeded():
# Access the image data
image = converter.Convert(grabResult)
img = image.GetArray()
grabResult.Release()
camera.Close()
assert img is not None, "Failed to capture an image."
assert img.shape[0] > 0, "Captured image has zero height."
assert img.shape[1] > 0, "Captured image has zero width."
except Exception as e:
pytest.fail(f"An error occurred during photo capture: {e}")

133
tests/test_e2e.py Normal file
View File

@ -0,0 +1,133 @@
import pytest
import subprocess
import time
import requests
import os
import sys
from playwright.sync_api import Page, expect
# Define the host and port for the application
HOST = "127.0.0.1"
PORT = 5000
BASE_URL = f"http://{HOST}:{PORT}"
STDOUT_FILE = "app_stdout.log"
STDERR_FILE = "app_stderr.log"
@pytest.fixture(scope="module")
def run_app():
"""
Fixture to run the Flask application in a test environment.
"""
# Set the environment variable for the subprocess
env = os.environ.copy()
env["PUPILOMETER_ENV"] = "test"
command = [sys.executable, "-u", "app.py"]
with open(STDOUT_FILE, "w") as stdout_f, open(STDERR_FILE, "w") as stderr_f:
process = subprocess.Popen(
command,
cwd="src/controllerSoftware",
stdout=stdout_f,
stderr=stderr_f,
text=True,
env=env
)
# Wait for the app to start
start_time = time.time()
while True:
if os.path.exists(STDERR_FILE):
with open(STDERR_FILE, "r") as f:
if "* Running on http" in f.read():
break
if time.time() - start_time > 15:
raise TimeoutError("Flask app failed to start in time.")
time.sleep(0.5)
yield process
process.terminate()
process.wait()
# Read stdout and stderr for debugging
with open(STDOUT_FILE, "r") as f:
print("App STDOUT:\n", f.read())
with open(STDERR_FILE, "r") as f:
print("App STDERR:\n", f.read())
if os.path.exists(STDOUT_FILE):
os.remove(STDOUT_FILE)
if os.path.exists(STDERR_FILE):
os.remove(STDERR_FILE)
def test_program_output(run_app):
"""
Tests that the mock backend is initialized.
"""
with open(STDERR_FILE, "r") as f:
stderr = f.read()
assert "Initializing Mock backend" in stderr
assert "MockBackend initialized." in stderr
def test_curl_output(run_app):
"""
Tests the API endpoints using requests (similar to curl).
"""
# Test the /ble_status endpoint
response_ble = requests.get(f"{BASE_URL}/ble_status")
assert response_ble.status_code == 200
assert response_ble.json() == {"connected": True} # In DEBUG_MODE
# Test the /vision/pupil_data endpoint
response_vision = requests.get(f"{BASE_URL}/vision/pupil_data")
assert response_vision.status_code == 200
assert "data" in response_vision.json()
assert "success" in response_vision.json()
def test_playwright_checks(page: Page, run_app):
"""
Performs basic and visual checks using Playwright.
"""
page.goto(BASE_URL)
# Basic output check: Title and heading
expect(page).to_have_title("Lamp Matrix Control")
heading = page.locator("h1")
expect(heading).to_have_text("Lamp Matrix Control")
# Pupil detection UI check
pupil_detection_section = page.locator("#pupil-detection")
expect(pupil_detection_section).to_be_visible()
expect(pupil_detection_section.locator("h2")).to_have_text("Pupil Detection")
pupil_canvas = page.locator("#pupil-canvas")
expect(pupil_canvas).to_be_visible()
pupil_center = page.locator("#pupil-center")
pupil_area = page.locator("#pupil-area")
expect(pupil_center).to_be_visible()
expect(pupil_area).to_be_visible()
# Wait for the pupil data to be updated
time.sleep(1)
expect(pupil_center).not_to_have_text("(x, y)")
expect(pupil_area).not_to_have_text("0")
# Camera stream UI check
camera_feed_section = page.locator("#video-feed")
expect(camera_feed_section).to_be_visible()
expect(camera_feed_section.locator("h2")).to_have_text("Camera Feed")
video_feed_img = page.locator("#video-feed img")
expect(video_feed_img).to_be_visible()
expect(video_feed_img).to_have_attribute("src", "/video_feed")
# Visual check: Screenshot
os.makedirs("screenshots", exist_ok=True)
screenshot_path = "screenshots/homepage.png"
page.screenshot(path=screenshot_path)
assert os.path.exists(screenshot_path)

View File

@ -0,0 +1,18 @@
import cv2
import time
import pytest
from playwright.sync_api import Page, expect
def test_segmentation_output(page: Page):
page.goto("http://localhost:5000/")
# Check for the presence of a segmentation feed for the first mono camera (stream 1)
segmentation_feed = page.locator("#segmentation-feed-1")
expect(segmentation_feed).to_be_visible()
# Verify that the segmentation feed is updating
initial_src = segmentation_feed.get_attribute("src")
page.reload()
page.wait_for_selector("#segmentation-feed-1")
new_src = segmentation_feed.get_attribute("src")
assert initial_src != new_src, "Segmentation feed is not updating"

135
tests/test_vision.py Normal file
View File

@ -0,0 +1,135 @@
import unittest
from unittest.mock import patch, MagicMock
import sys
import os
import numpy as np
# Add the src/controllerSoftware directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../src/controllerSoftware')))
# Mock the gi module
sys.modules['gi'] = MagicMock()
sys.modules['gi.repository'] = MagicMock()
from vision import VisionSystem, DeepStreamBackend, PythonBackend, MockBackend
class TestVisionSystem(unittest.TestCase):
"""
Unit tests for the VisionSystem class.
"""
def setUp(self):
"""
Set up a VisionSystem instance with a mocked backend for each test.
"""
self.config = {"camera_id": 0, "model_path": "yolov8n-seg.pt"}
@patch('platform.system', return_value='Linux')
@patch('vision.DeepStreamBackend')
def test_initialization_linux(self, mock_backend_class, mock_system):
"""
Test that the VisionSystem initializes the DeepStreamBackend on Linux.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
expected_config = self.config.copy()
expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name
mock_backend_class.assert_called_once_with(expected_config)
self.assertEqual(vision_system._backend, mock_backend_instance)
@patch('platform.system', return_value='Windows')
@patch('vision.DeepStreamBackend')
def test_initialization_windows(self, mock_backend_class, mock_system):
"""
Test that the VisionSystem initializes the DeepStreamBackend on Windows.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
expected_config = self.config.copy()
expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name
mock_backend_class.assert_called_once_with(expected_config)
self.assertEqual(vision_system._backend, mock_backend_instance)
@patch('platform.system', return_value='Darwin')
@patch('vision.PythonBackend')
def test_initialization_macos(self, mock_backend_class, mock_system):
"""
Test that the VisionSystem initializes the PythonBackend on macOS.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
expected_config = self.config.copy()
expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name
mock_backend_class.assert_called_once_with(expected_config)
self.assertEqual(vision_system._backend, mock_backend_instance)
@patch('platform.system', return_value='UnsupportedOS')
def test_initialization_unsupported(self, mock_system):
"""
Test that the VisionSystem raises an exception on an unsupported OS.
"""
with self.assertRaises(NotImplementedError):
VisionSystem(self.config)
@patch('platform.system', return_value='Linux')
@patch('vision.DeepStreamBackend')
def test_start(self, mock_backend_class, mock_system):
"""
Test that the start method calls the backend's start method.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
vision_system.start()
mock_backend_instance.start.assert_called_once()
@patch('platform.system', return_value='Linux')
@patch('vision.DeepStreamBackend')
def test_stop(self, mock_backend_class, mock_system):
"""
Test that the stop method calls the backend's stop method.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
vision_system.stop()
mock_backend_instance.stop.assert_called_once()
@patch('platform.system', return_value='Linux')
@patch('vision.DeepStreamBackend')
def test_get_pupil_data(self, mock_backend_class, mock_system):
"""
Test that the get_pupil_data method calls the backend's get_pupil_data method.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
vision_system.get_pupil_data()
mock_backend_instance.get_pupil_data.assert_called_once()
@patch('platform.system', return_value='Linux')
@patch('vision.DeepStreamBackend')
def test_get_annotated_frame(self, mock_backend_class, mock_system):
"""
Test that the get_annotated_frame method calls the backend's get_annotated_frame method.
"""
mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config)
vision_system.get_annotated_frame()
mock_backend_instance.get_annotated_frame.assert_called_once()
def test_mock_backend_methods(self):
"""
Test the methods of the MockBackend.
"""
backend = MockBackend(self.config)
backend.start()
backend.stop()
data = backend.get_pupil_data()
self.assertIn("pupil_position", data)
frame = backend.get_annotated_frame()
self.assertIsInstance(frame, np.ndarray)
def test_model_exists(self):
"""
Tests that the YOLO model file (.pt) exists at the expected location.
"""
model_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../src/controllerSoftware', self.config['model_path']))
self.assertTrue(os.path.exists(model_path), f"YOLO model file not found at {model_path}")