Temporary Commit

This commit is contained in:
Tempest 2025-11-28 16:18:30 +07:00
parent 7d5b283dd3
commit 17d691173b
7 changed files with 252 additions and 126 deletions

View File

@ -1,9 +1,65 @@
bleak>="1.0.0" bleak==2.0.0
flask>="3.1.1" blinker==1.9.0
pypylon>= "4.0.0" certifi==2025.11.12
onnxruntime>= "1.18.0" charset-normalizer==3.4.4
onnxruntime-gpu>= "1.18.0" click==8.3.1
opencv-python>= "4.9.0" colorama==0.4.6
pytest>= "8.0.0" coloredlogs==15.0.1
pytest-playwright>= "0.4.0" contourpy==1.3.3
requests>= "2.31.0" cycler==0.12.1
filelock==3.20.0
Flask==3.1.2
flatbuffers==25.9.23
fonttools==4.60.1
fsspec==2025.10.0
greenlet==3.2.4
humanfriendly==10.0
idna==3.11
iniconfig==2.3.0
itsdangerous==2.2.0
Jinja2==3.1.6
kiwisolver==1.4.9
MarkupSafe==3.0.3
matplotlib==3.10.7
ml_dtypes==0.5.4
mpmath==1.3.0
networkx==3.6
numpy==1.26.4
onnx==1.19.1
onnxruntime==1.23.2
onnxslim==0.1.77
opencv-python==4.12.0.88
packaging==25.0
pillow==12.0.0
playwright==1.56.0
pluggy==1.6.0
polars==1.35.2
polars-runtime-32==1.35.2
protobuf==6.33.1
psutil==7.1.3
pyee==13.0.0
Pygments==2.19.2
pyobjc-core==12.1
pyobjc-framework-Cocoa==12.1
pyobjc-framework-CoreBluetooth==12.1
pyobjc-framework-libdispatch==12.1
pyparsing==3.2.5
pypylon==4.2.0
pytest==9.0.1
pytest-base-url==2.1.0
pytest-playwright==0.7.2
python-dateutil==2.9.0.post0
python-slugify==8.0.4
PyYAML==6.0.3
requests==2.32.5
scipy==1.16.3
six==1.17.0
sympy==1.14.0
text-unidecode==1.3
torch==2.2.2
torchvision==0.17.2
typing_extensions==4.15.0
ultralytics==8.3.233
ultralytics-thop==2.0.18
urllib3==2.5.0
Werkzeug==3.1.3

0
run.sh Normal file → Executable file
View File

View File

@ -341,7 +341,7 @@ if __name__ == '__main__':
# Initialize and start the Vision System # Initialize and start the Vision System
try: try:
vision_config = {"camera_id": 0, "model_path": "yolov10.onnx"} vision_config = {"camera_id": 0, "model_name": "yolov8n-seg.pt"}
vision_system = VisionSystem(config=vision_config) vision_system = VisionSystem(config=vision_config)
vision_system.start() vision_system.start()
except Exception as e: except Exception as e:

View File

@ -4,6 +4,7 @@ import os
import numpy as np import numpy as np
import cv2 import cv2
import logging import logging
from ultralytics import YOLO # New import
# Configure logging # Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@ -15,7 +16,10 @@ class VisionSystem:
""" """
def __init__(self, config): def __init__(self, config):
self.config = config self.config = config.copy()
self.config.setdefault('model_name', 'yolov8n-seg.pt') # Set default model
# Ensure model_path in config points to the selected model_name
self.config['model_path'] = self.config['model_name']
self._backend = self._initialize_backend() self._backend = self._initialize_backend()
def _initialize_backend(self): def _initialize_backend(self):
@ -152,7 +156,7 @@ class DeepStreamBackend:
class PythonBackend: class PythonBackend:
""" """
A class to handle pupil segmentation on macOS using pypylon and ONNX Runtime. A class to handle pupil segmentation on macOS using pypylon and Ultralytics YOLO models.
""" """
def __init__(self, config): def __init__(self, config):
@ -165,26 +169,28 @@ class PythonBackend:
""" """
self.config = config self.config = config
self.camera = None self.camera = None
self.inference_session = None self.model = None # Ultralytics YOLO model
self.annotated_frame = None self.annotated_frame = None
self.conf_threshold = 0.25 # Confidence threshold for object detection
self.iou_threshold = 0.45 # IoU threshold for Non-Maximum Suppression
# Load the YOLO model (e.g., yolov8n-seg.pt)
try: try:
import onnxruntime as ort model_full_path = os.path.join(os.path.dirname(__file__), self.config['model_path'])
if 'CUDAExecutionProvider' in ort.get_available_providers(): self.model = YOLO(model_full_path)
logging.info("CUDA is available. Using onnxruntime-gpu.") logging.info(f"PythonBackend: Ultralytics YOLO model loaded from {model_full_path}.")
self.ort = ort # Dynamically get class names from the model
else: self.class_names = self.model.names
raise ImportError("CUDAExecutionProvider not found.") except Exception as e:
except ImportError: logging.error(f"PythonBackend: Error loading Ultralytics YOLO model: {e}")
logging.warning("onnxruntime-gpu is not available or CUDA is not configured. Falling back to onnxruntime (CPU).") self.model = None
import onnxruntime as ort self.class_names = [] # Fallback to empty list
self.ort = ort
logging.info("PythonBackend initialized.") logging.info("PythonBackend initialized.")
def start(self): def start(self):
""" """
Initializes the Basler camera and loads the ONNX model. Initializes the Basler camera.
""" """
try: try:
from pypylon import pylon from pypylon import pylon
@ -202,14 +208,6 @@ class PythonBackend:
logging.error(f"PythonBackend: Error opening Basler camera: {e}") logging.error(f"PythonBackend: Error opening Basler camera: {e}")
self.camera = None self.camera = None
try:
# Load the ONNX model
self.inference_session = self.ort.InferenceSession(self.config['model_path'])
logging.info(f"PythonBackend: ONNX model loaded from {self.config['model_path']}.")
except Exception as e:
logging.error(f"PythonBackend: Error loading ONNX model: {e}")
self.inference_session = None
logging.info("PythonBackend started.") logging.info("PythonBackend started.")
def stop(self): def stop(self):
@ -224,78 +222,92 @@ class PythonBackend:
logging.info("PythonBackend: Basler camera closed.") logging.info("PythonBackend: Basler camera closed.")
logging.info("PythonBackend stopped.") logging.info("PythonBackend stopped.")
def _postprocess_output(self, outputs, original_image_shape):
"""
Post-processes the raw output from the YOLOv10 model.
Args:
outputs (list): A list of numpy arrays representing the model's output.
original_image_shape (tuple): The shape of the original image (height, width).
Returns:
dict: A dictionary containing the processed pupil data.
"""
# TODO: Implement the actual post-processing logic.
# This will involve non-maximum suppression (NMS) and parsing the
# bounding boxes and segmentation masks.
logging.info("Post-processing model output...")
pupil_data = {
"raw_model_output_shape": [o.shape for o in outputs],
"pupil_position": (100, 120), # Placeholder
"pupil_diameter": 30, # Placeholder
"bounding_box": [50, 70, 150, 170] # Placeholder [x1, y1, x2, y2]
}
return pupil_data
def get_pupil_data(self): def get_pupil_data(self):
""" """
Grabs a frame from the camera, runs inference, and returns pupil data. Grabs a frame from the camera, runs inference using Ultralytics YOLO, and returns pupil data.
""" """
if not self.camera or not self.camera.IsGrabbing(): if not self.camera or not self.camera.IsGrabbing():
logging.warning("PythonBackend: Camera not ready.") logging.warning("PythonBackend: Camera not ready.")
return None return None
if not self.inference_session: if not self.model:
logging.warning("PythonBackend: Inference session not ready.") logging.warning("PythonBackend: YOLO model not loaded.")
return None return None
grab_result = None grab_result = None
try: try:
from pypylon import pylon
import cv2 import cv2
import numpy as np import numpy as np
from pypylon import pylon
grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException) grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grab_result.GrabSucceeded(): if grab_result.GrabSucceeded():
image = grab_result.Array image_np = grab_result.Array # This is typically a grayscale image from Basler
original_shape = image.shape
# Image preprocessing # Convert grayscale to BGR if necessary for YOLO (YOLO expects 3 channels)
if len(image.shape) == 2: if len(image_np.shape) == 2:
image = cv2.cvtColor(image, cv2.COLOR_BAYER_BG2RGB) image_bgr = cv2.cvtColor(image_np, cv2.COLOR_GRAY2BGR)
else:
image_bgr = image_np
input_shape = (640, 640) # Run inference with Ultralytics YOLO
resized_image = cv2.resize(image, input_shape) results = self.model.predict(source=image_bgr, conf=self.conf_threshold, iou=self.iou_threshold, verbose=False)
normalized_image = resized_image.astype(np.float32) / 255.0
transposed_image = np.transpose(normalized_image, (2, 0, 1))
input_tensor = np.expand_dims(transposed_image, axis=0)
# Run inference pupil_data = {}
input_name = self.inference_session.get_inputs()[0].name self.annotated_frame = image_bgr.copy() # Start with original image for annotation
output_names = [o.name for o in self.inference_session.get_outputs()]
outputs = self.inference_session.run(output_names, {input_name: input_tensor})
# Post-process the output if results and len(results[0].boxes) > 0: # Check if any detections are made
pupil_data = self._postprocess_output(outputs, original_shape) # Assuming we are interested in the largest or most confident pupil
# For simplicity, let's process the first detection
result = results[0] # Results for the first (and only) image
# Draw segmentation on the frame # Extract bounding box
annotated_frame = image.copy() box = result.boxes.xyxy[0].cpu().numpy().astype(int) # xyxy format
if pupil_data and "bounding_box" in pupil_data: x1, y1, x2, y2 = box
x1, y1, x2, y2 = pupil_data["bounding_box"]
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # Extract confidence and class ID
self.annotated_frame = annotated_frame confidence = result.boxes.conf[0].cpu().numpy().item()
class_id = int(result.boxes.cls[0].cpu().numpy().item())
class_name = self.class_names[class_id]
# Calculate pupil position (center of bounding box)
pupil_center_x = (x1 + x2) // 2
pupil_center_y = (y1 + y2) // 2
# Calculate pupil diameter (average of width and height of bounding box)
pupil_diameter = (x2 - x1 + y2 - y1) // 2
pupil_data = {
"pupil_position": (pupil_center_x, pupil_center_y),
"pupil_diameter": pupil_diameter,
"class_name": class_name,
"confidence": confidence,
"bounding_box": box.tolist() # Convert numpy array to list for JSON serialization
}
# Extract and draw segmentation mask
if result.masks:
# Get the mask for the first detection, upsampled to original image size
mask_np = result.masks.data[0].cpu().numpy() # Raw mask data
# Resize mask to original image dimensions if necessary (ultralytics usually returns scaled masks)
mask_resized = cv2.resize(mask_np, (image_bgr.shape[1], image_bgr.shape[0]), interpolation=cv2.INTER_LINEAR)
binary_mask = (mask_resized > 0.5).astype(np.uint8) * 255 # Threshold to binary
# Draw bounding box
color = (0, 255, 0) # Green for pupil detection
cv2.rectangle(self.annotated_frame, (x1, y1), (x2, y2), color, 2)
# Create a colored mask overlay
mask_color = np.array([0, 255, 0], dtype=np.uint8) # Green color for mask
colored_mask_overlay = np.zeros_like(self.annotated_frame, dtype=np.uint8)
colored_mask_overlay[binary_mask > 0] = mask_color
self.annotated_frame = cv2.addWeighted(self.annotated_frame, 1, colored_mask_overlay, 0.5, 0)
# Draw label
label = f"{class_name}: {confidence:.2f}"
cv2.putText(self.annotated_frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
else:
logging.info("No objects detected by YOLO model.")
return pupil_data return pupil_data
else: else:
@ -314,9 +326,11 @@ class PythonBackend:
""" """
return self.annotated_frame return self.annotated_frame
# The if __name__ == '__main__': block should be outside the class
if __name__ == '__main__': if __name__ == '__main__':
# Example usage # Example usage
config = {"camera_id": 0, "model_path": "yolov10.onnx"} # Ensure 'yolov8n-seg.pt' is in src/controllerSoftware for this example to run
config = {"camera_id": 0, "model_path": "yolov8n-seg.pt"}
try: try:
vision_system = VisionSystem(config) vision_system = VisionSystem(config)
@ -324,7 +338,10 @@ if __name__ == '__main__':
# In a real application, this would run in a loop # In a real application, this would run in a loop
pupil_data = vision_system.get_pupil_data() pupil_data = vision_system.get_pupil_data()
if pupil_data:
logging.info(f"Received pupil data: {pupil_data}") logging.info(f"Received pupil data: {pupil_data}")
else:
logging.info("No pupil data received.")
# Get and show the annotated frame # Get and show the annotated frame
annotated_frame = vision_system.get_annotated_frame() annotated_frame = vision_system.get_annotated_frame()
@ -339,4 +356,3 @@ if __name__ == '__main__':
logging.error(e) logging.error(e)
except Exception as e: except Exception as e:
logging.error(f"An error occurred: {e}") logging.error(f"An error occurred: {e}")

21
tests/conftest.py Normal file
View File

@ -0,0 +1,21 @@
import pytest
from pypylon import pylon
@pytest.fixture(scope="session")
def camera_available():
"""
Pytest fixture that checks for a connected Basler camera.
If no camera is found, it skips the tests that depend on this fixture.
"""
try:
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if not devices:
pytest.skip("No Basler camera found. Skipping tests that require a camera.")
# You can also add a photo capture test here if you want
# For now, just detecting the camera is enough
except Exception as e:
pytest.fail(f"An error occurred during camera detection: {e}")

View File

@ -0,0 +1,52 @@
import pytest
from pypylon import pylon
import cv2
@pytest.mark.usefixtures("camera_available")
def test_capture_photo():
"""
Tests that a photo can be captured from the Basler camera.
This test depends on the `camera_available` fixture in conftest.py.
"""
try:
# Get the transport layer factory.
tl_factory = pylon.TlFactory.GetInstance()
# Get all attached devices and exit application if no device is found.
devices = tl_factory.EnumerateDevices()
# Only grab from the first camera found
camera = pylon.InstantCamera(tl_factory.CreateDevice(devices[0]))
camera.Open()
# Max number of images to grab
countOfImagesToGrab = 1
# Create an image format converter
converter = pylon.ImageFormatConverter()
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
# Start grabbing continuously
camera.StartGrabbingMax(countOfImagesToGrab)
img = None
while camera.IsGrabbing():
grabResult = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grabResult.GrabSucceeded():
# Access the image data
image = converter.Convert(grabResult)
img = image.GetArray()
grabResult.Release()
camera.Close()
assert img is not None, "Failed to capture an image."
assert img.shape[0] > 0, "Captured image has zero height."
assert img.shape[1] > 0, "Captured image has zero width."
except Exception as e:
pytest.fail(f"An error occurred during photo capture: {e}")

View File

@ -22,7 +22,7 @@ class TestVisionSystem(unittest.TestCase):
""" """
Set up a VisionSystem instance with a mocked backend for each test. Set up a VisionSystem instance with a mocked backend for each test.
""" """
self.config = {"camera_id": 0, "model_path": "yolov10.onnx"} self.config = {"camera_id": 0, "model_path": "yolov8n-seg.pt"}
@patch('platform.system', return_value='Linux') @patch('platform.system', return_value='Linux')
@patch('vision.DeepStreamBackend') @patch('vision.DeepStreamBackend')
@ -32,7 +32,9 @@ class TestVisionSystem(unittest.TestCase):
""" """
mock_backend_instance = mock_backend_class.return_value mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config) vision_system = VisionSystem(self.config)
mock_backend_class.assert_called_once_with(self.config) expected_config = self.config.copy()
expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name
mock_backend_class.assert_called_once_with(expected_config)
self.assertEqual(vision_system._backend, mock_backend_instance) self.assertEqual(vision_system._backend, mock_backend_instance)
@patch('platform.system', return_value='Windows') @patch('platform.system', return_value='Windows')
@ -43,7 +45,9 @@ class TestVisionSystem(unittest.TestCase):
""" """
mock_backend_instance = mock_backend_class.return_value mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config) vision_system = VisionSystem(self.config)
mock_backend_class.assert_called_once_with(self.config) expected_config = self.config.copy()
expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name
mock_backend_class.assert_called_once_with(expected_config)
self.assertEqual(vision_system._backend, mock_backend_instance) self.assertEqual(vision_system._backend, mock_backend_instance)
@patch('platform.system', return_value='Darwin') @patch('platform.system', return_value='Darwin')
@ -54,7 +58,9 @@ class TestVisionSystem(unittest.TestCase):
""" """
mock_backend_instance = mock_backend_class.return_value mock_backend_instance = mock_backend_class.return_value
vision_system = VisionSystem(self.config) vision_system = VisionSystem(self.config)
mock_backend_class.assert_called_once_with(self.config) expected_config = self.config.copy()
expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name
mock_backend_class.assert_called_once_with(expected_config)
self.assertEqual(vision_system._backend, mock_backend_instance) self.assertEqual(vision_system._backend, mock_backend_instance)
@patch('platform.system', return_value='UnsupportedOS') @patch('platform.system', return_value='UnsupportedOS')
@ -109,35 +115,6 @@ class TestVisionSystem(unittest.TestCase):
vision_system.get_annotated_frame() vision_system.get_annotated_frame()
mock_backend_instance.get_annotated_frame.assert_called_once() mock_backend_instance.get_annotated_frame.assert_called_once()
@patch('vision.logging')
@patch.dict('sys.modules', {'onnxruntime': MagicMock(), 'onnxruntime-gpu': None})
def test_python_backend_cpu_fallback(self, mock_logging):
"""
Test that PythonBackend falls back to CPU when onnxruntime-gpu is not available.
"""
mock_ort = sys.modules['onnxruntime']
mock_ort.get_available_providers.return_value = ['CPUExecutionProvider']
backend = PythonBackend(self.config)
mock_logging.warning.assert_called_with("onnxruntime-gpu is not available or CUDA is not configured. Falling back to onnxruntime (CPU).")
self.assertEqual(backend.ort, mock_ort)
@patch('vision.logging')
@patch.dict('sys.modules', {'onnxruntime': MagicMock()})
def test_python_backend_gpu_selection(self, mock_logging):
"""
Test that PythonBackend selects GPU when onnxruntime-gpu is available.
"""
mock_ort_gpu = MagicMock()
mock_ort_gpu.get_available_providers.return_value = ['CUDAExecutionProvider', 'CPUExecutionProvider']
sys.modules['onnxruntime'] = mock_ort_gpu
backend = PythonBackend(self.config)
mock_logging.info.assert_any_call("CUDA is available. Using onnxruntime-gpu.")
self.assertEqual(backend.ort, mock_ort_gpu)
def test_mock_backend_methods(self): def test_mock_backend_methods(self):
""" """
Test the methods of the MockBackend. Test the methods of the MockBackend.
@ -150,5 +127,9 @@ class TestVisionSystem(unittest.TestCase):
frame = backend.get_annotated_frame() frame = backend.get_annotated_frame()
self.assertIsInstance(frame, np.ndarray) self.assertIsInstance(frame, np.ndarray)
if __name__ == '__main__': def test_model_exists(self):
unittest.main() """
Tests that the YOLO model file (.pt) exists at the expected location.
"""
model_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../src/controllerSoftware', self.config['model_path']))
self.assertTrue(os.path.exists(model_path), f"YOLO model file not found at {model_path}")