diff --git a/requirements.txt b/requirements.txt index a6ce2bf2..0abc3a78 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,65 @@ -bleak>="1.0.0" -flask>="3.1.1" -pypylon>= "4.0.0" -onnxruntime>= "1.18.0" -onnxruntime-gpu>= "1.18.0" -opencv-python>= "4.9.0" -pytest>= "8.0.0" -pytest-playwright>= "0.4.0" -requests>= "2.31.0" \ No newline at end of file +bleak==2.0.0 +blinker==1.9.0 +certifi==2025.11.12 +charset-normalizer==3.4.4 +click==8.3.1 +colorama==0.4.6 +coloredlogs==15.0.1 +contourpy==1.3.3 +cycler==0.12.1 +filelock==3.20.0 +Flask==3.1.2 +flatbuffers==25.9.23 +fonttools==4.60.1 +fsspec==2025.10.0 +greenlet==3.2.4 +humanfriendly==10.0 +idna==3.11 +iniconfig==2.3.0 +itsdangerous==2.2.0 +Jinja2==3.1.6 +kiwisolver==1.4.9 +MarkupSafe==3.0.3 +matplotlib==3.10.7 +ml_dtypes==0.5.4 +mpmath==1.3.0 +networkx==3.6 +numpy==1.26.4 +onnx==1.19.1 +onnxruntime==1.23.2 +onnxslim==0.1.77 +opencv-python==4.12.0.88 +packaging==25.0 +pillow==12.0.0 +playwright==1.56.0 +pluggy==1.6.0 +polars==1.35.2 +polars-runtime-32==1.35.2 +protobuf==6.33.1 +psutil==7.1.3 +pyee==13.0.0 +Pygments==2.19.2 +pyobjc-core==12.1 +pyobjc-framework-Cocoa==12.1 +pyobjc-framework-CoreBluetooth==12.1 +pyobjc-framework-libdispatch==12.1 +pyparsing==3.2.5 +pypylon==4.2.0 +pytest==9.0.1 +pytest-base-url==2.1.0 +pytest-playwright==0.7.2 +python-dateutil==2.9.0.post0 +python-slugify==8.0.4 +PyYAML==6.0.3 +requests==2.32.5 +scipy==1.16.3 +six==1.17.0 +sympy==1.14.0 +text-unidecode==1.3 +torch==2.2.2 +torchvision==0.17.2 +typing_extensions==4.15.0 +ultralytics==8.3.233 +ultralytics-thop==2.0.18 +urllib3==2.5.0 +Werkzeug==3.1.3 diff --git a/run.sh b/run.sh old mode 100644 new mode 100755 diff --git a/src/controllerSoftware/app.py b/src/controllerSoftware/app.py index 14f43533..52af3508 100644 --- a/src/controllerSoftware/app.py +++ b/src/controllerSoftware/app.py @@ -341,7 +341,7 @@ if __name__ == '__main__': # Initialize and start the Vision System try: - vision_config = {"camera_id": 0, "model_path": "yolov10.onnx"} + vision_config = {"camera_id": 0, "model_name": "yolov8n-seg.pt"} vision_system = VisionSystem(config=vision_config) vision_system.start() except Exception as e: diff --git a/src/controllerSoftware/vision.py b/src/controllerSoftware/vision.py index 7bd903f1..9ac1d82e 100644 --- a/src/controllerSoftware/vision.py +++ b/src/controllerSoftware/vision.py @@ -4,6 +4,7 @@ import os import numpy as np import cv2 import logging +from ultralytics import YOLO # New import # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -15,7 +16,10 @@ class VisionSystem: """ def __init__(self, config): - self.config = config + self.config = config.copy() + self.config.setdefault('model_name', 'yolov8n-seg.pt') # Set default model + # Ensure model_path in config points to the selected model_name + self.config['model_path'] = self.config['model_name'] self._backend = self._initialize_backend() def _initialize_backend(self): @@ -152,7 +156,7 @@ class DeepStreamBackend: class PythonBackend: """ - A class to handle pupil segmentation on macOS using pypylon and ONNX Runtime. + A class to handle pupil segmentation on macOS using pypylon and Ultralytics YOLO models. """ def __init__(self, config): @@ -165,26 +169,28 @@ class PythonBackend: """ self.config = config self.camera = None - self.inference_session = None + self.model = None # Ultralytics YOLO model self.annotated_frame = None - + self.conf_threshold = 0.25 # Confidence threshold for object detection + self.iou_threshold = 0.45 # IoU threshold for Non-Maximum Suppression + + # Load the YOLO model (e.g., yolov8n-seg.pt) try: - import onnxruntime as ort - if 'CUDAExecutionProvider' in ort.get_available_providers(): - logging.info("CUDA is available. Using onnxruntime-gpu.") - self.ort = ort - else: - raise ImportError("CUDAExecutionProvider not found.") - except ImportError: - logging.warning("onnxruntime-gpu is not available or CUDA is not configured. Falling back to onnxruntime (CPU).") - import onnxruntime as ort - self.ort = ort - + model_full_path = os.path.join(os.path.dirname(__file__), self.config['model_path']) + self.model = YOLO(model_full_path) + logging.info(f"PythonBackend: Ultralytics YOLO model loaded from {model_full_path}.") + # Dynamically get class names from the model + self.class_names = self.model.names + except Exception as e: + logging.error(f"PythonBackend: Error loading Ultralytics YOLO model: {e}") + self.model = None + self.class_names = [] # Fallback to empty list + logging.info("PythonBackend initialized.") def start(self): """ - Initializes the Basler camera and loads the ONNX model. + Initializes the Basler camera. """ try: from pypylon import pylon @@ -201,14 +207,6 @@ class PythonBackend: except Exception as e: logging.error(f"PythonBackend: Error opening Basler camera: {e}") self.camera = None - - try: - # Load the ONNX model - self.inference_session = self.ort.InferenceSession(self.config['model_path']) - logging.info(f"PythonBackend: ONNX model loaded from {self.config['model_path']}.") - except Exception as e: - logging.error(f"PythonBackend: Error loading ONNX model: {e}") - self.inference_session = None logging.info("PythonBackend started.") @@ -224,79 +222,93 @@ class PythonBackend: logging.info("PythonBackend: Basler camera closed.") logging.info("PythonBackend stopped.") - def _postprocess_output(self, outputs, original_image_shape): - """ - Post-processes the raw output from the YOLOv10 model. - - Args: - outputs (list): A list of numpy arrays representing the model's output. - original_image_shape (tuple): The shape of the original image (height, width). - - Returns: - dict: A dictionary containing the processed pupil data. - """ - # TODO: Implement the actual post-processing logic. - # This will involve non-maximum suppression (NMS) and parsing the - # bounding boxes and segmentation masks. - - logging.info("Post-processing model output...") - - pupil_data = { - "raw_model_output_shape": [o.shape for o in outputs], - "pupil_position": (100, 120), # Placeholder - "pupil_diameter": 30, # Placeholder - "bounding_box": [50, 70, 150, 170] # Placeholder [x1, y1, x2, y2] - } - return pupil_data - def get_pupil_data(self): """ - Grabs a frame from the camera, runs inference, and returns pupil data. + Grabs a frame from the camera, runs inference using Ultralytics YOLO, and returns pupil data. """ if not self.camera or not self.camera.IsGrabbing(): logging.warning("PythonBackend: Camera not ready.") return None - if not self.inference_session: - logging.warning("PythonBackend: Inference session not ready.") + if not self.model: + logging.warning("PythonBackend: YOLO model not loaded.") return None grab_result = None try: + from pypylon import pylon import cv2 import numpy as np - from pypylon import pylon grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException) if grab_result.GrabSucceeded(): - image = grab_result.Array - original_shape = image.shape + image_np = grab_result.Array # This is typically a grayscale image from Basler - # Image preprocessing - if len(image.shape) == 2: - image = cv2.cvtColor(image, cv2.COLOR_BAYER_BG2RGB) - - input_shape = (640, 640) - resized_image = cv2.resize(image, input_shape) - normalized_image = resized_image.astype(np.float32) / 255.0 - transposed_image = np.transpose(normalized_image, (2, 0, 1)) - input_tensor = np.expand_dims(transposed_image, axis=0) + # Convert grayscale to BGR if necessary for YOLO (YOLO expects 3 channels) + if len(image_np.shape) == 2: + image_bgr = cv2.cvtColor(image_np, cv2.COLOR_GRAY2BGR) + else: + image_bgr = image_np - # Run inference - input_name = self.inference_session.get_inputs()[0].name - output_names = [o.name for o in self.inference_session.get_outputs()] - outputs = self.inference_session.run(output_names, {input_name: input_tensor}) + # Run inference with Ultralytics YOLO + results = self.model.predict(source=image_bgr, conf=self.conf_threshold, iou=self.iou_threshold, verbose=False) - # Post-process the output - pupil_data = self._postprocess_output(outputs, original_shape) + pupil_data = {} + self.annotated_frame = image_bgr.copy() # Start with original image for annotation + + if results and len(results[0].boxes) > 0: # Check if any detections are made + # Assuming we are interested in the largest or most confident pupil + # For simplicity, let's process the first detection + result = results[0] # Results for the first (and only) image + + # Extract bounding box + box = result.boxes.xyxy[0].cpu().numpy().astype(int) # xyxy format + x1, y1, x2, y2 = box + + # Extract confidence and class ID + confidence = result.boxes.conf[0].cpu().numpy().item() + class_id = int(result.boxes.cls[0].cpu().numpy().item()) + class_name = self.class_names[class_id] + + # Calculate pupil position (center of bounding box) + pupil_center_x = (x1 + x2) // 2 + pupil_center_y = (y1 + y2) // 2 + + # Calculate pupil diameter (average of width and height of bounding box) + pupil_diameter = (x2 - x1 + y2 - y1) // 2 + + pupil_data = { + "pupil_position": (pupil_center_x, pupil_center_y), + "pupil_diameter": pupil_diameter, + "class_name": class_name, + "confidence": confidence, + "bounding_box": box.tolist() # Convert numpy array to list for JSON serialization + } + + # Extract and draw segmentation mask + if result.masks: + # Get the mask for the first detection, upsampled to original image size + mask_np = result.masks.data[0].cpu().numpy() # Raw mask data + # Resize mask to original image dimensions if necessary (ultralytics usually returns scaled masks) + mask_resized = cv2.resize(mask_np, (image_bgr.shape[1], image_bgr.shape[0]), interpolation=cv2.INTER_LINEAR) + binary_mask = (mask_resized > 0.5).astype(np.uint8) * 255 # Threshold to binary + + # Draw bounding box + color = (0, 255, 0) # Green for pupil detection + cv2.rectangle(self.annotated_frame, (x1, y1), (x2, y2), color, 2) + + # Create a colored mask overlay + mask_color = np.array([0, 255, 0], dtype=np.uint8) # Green color for mask + colored_mask_overlay = np.zeros_like(self.annotated_frame, dtype=np.uint8) + colored_mask_overlay[binary_mask > 0] = mask_color + self.annotated_frame = cv2.addWeighted(self.annotated_frame, 1, colored_mask_overlay, 0.5, 0) + + # Draw label + label = f"{class_name}: {confidence:.2f}" + cv2.putText(self.annotated_frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) + else: + logging.info("No objects detected by YOLO model.") - # Draw segmentation on the frame - annotated_frame = image.copy() - if pupil_data and "bounding_box" in pupil_data: - x1, y1, x2, y2 = pupil_data["bounding_box"] - cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 2) - self.annotated_frame = annotated_frame - return pupil_data else: logging.error(f"PythonBackend: Error grabbing frame: {grab_result.ErrorCode} {grab_result.ErrorDescription}") @@ -314,9 +326,11 @@ class PythonBackend: """ return self.annotated_frame +# The if __name__ == '__main__': block should be outside the class if __name__ == '__main__': # Example usage - config = {"camera_id": 0, "model_path": "yolov10.onnx"} + # Ensure 'yolov8n-seg.pt' is in src/controllerSoftware for this example to run + config = {"camera_id": 0, "model_path": "yolov8n-seg.pt"} try: vision_system = VisionSystem(config) @@ -324,7 +338,10 @@ if __name__ == '__main__': # In a real application, this would run in a loop pupil_data = vision_system.get_pupil_data() - logging.info(f"Received pupil data: {pupil_data}") + if pupil_data: + logging.info(f"Received pupil data: {pupil_data}") + else: + logging.info("No pupil data received.") # Get and show the annotated frame annotated_frame = vision_system.get_annotated_frame() @@ -338,5 +355,4 @@ if __name__ == '__main__': except NotImplementedError as e: logging.error(e) except Exception as e: - logging.error(f"An error occurred: {e}") - + logging.error(f"An error occurred: {e}") \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..fd0c13de --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,21 @@ + +import pytest +from pypylon import pylon + +@pytest.fixture(scope="session") +def camera_available(): + """ + Pytest fixture that checks for a connected Basler camera. + If no camera is found, it skips the tests that depend on this fixture. + """ + try: + tl_factory = pylon.TlFactory.GetInstance() + devices = tl_factory.EnumerateDevices() + if not devices: + pytest.skip("No Basler camera found. Skipping tests that require a camera.") + + # You can also add a photo capture test here if you want + # For now, just detecting the camera is enough + + except Exception as e: + pytest.fail(f"An error occurred during camera detection: {e}") diff --git a/tests/test_camera_integration.py b/tests/test_camera_integration.py new file mode 100644 index 00000000..8ec4606e --- /dev/null +++ b/tests/test_camera_integration.py @@ -0,0 +1,52 @@ + +import pytest +from pypylon import pylon +import cv2 + +@pytest.mark.usefixtures("camera_available") +def test_capture_photo(): + """ + Tests that a photo can be captured from the Basler camera. + This test depends on the `camera_available` fixture in conftest.py. + """ + try: + # Get the transport layer factory. + tl_factory = pylon.TlFactory.GetInstance() + + # Get all attached devices and exit application if no device is found. + devices = tl_factory.EnumerateDevices() + + # Only grab from the first camera found + camera = pylon.InstantCamera(tl_factory.CreateDevice(devices[0])) + camera.Open() + + # Max number of images to grab + countOfImagesToGrab = 1 + + # Create an image format converter + converter = pylon.ImageFormatConverter() + converter.OutputPixelFormat = pylon.PixelType_BGR8packed + converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned + + # Start grabbing continuously + camera.StartGrabbingMax(countOfImagesToGrab) + + img = None + while camera.IsGrabbing(): + grabResult = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException) + + if grabResult.GrabSucceeded(): + # Access the image data + image = converter.Convert(grabResult) + img = image.GetArray() + + grabResult.Release() + + camera.Close() + + assert img is not None, "Failed to capture an image." + assert img.shape[0] > 0, "Captured image has zero height." + assert img.shape[1] > 0, "Captured image has zero width." + + except Exception as e: + pytest.fail(f"An error occurred during photo capture: {e}") diff --git a/tests/test_vision.py b/tests/test_vision.py index 4b813a66..374e6adc 100644 --- a/tests/test_vision.py +++ b/tests/test_vision.py @@ -22,7 +22,7 @@ class TestVisionSystem(unittest.TestCase): """ Set up a VisionSystem instance with a mocked backend for each test. """ - self.config = {"camera_id": 0, "model_path": "yolov10.onnx"} + self.config = {"camera_id": 0, "model_path": "yolov8n-seg.pt"} @patch('platform.system', return_value='Linux') @patch('vision.DeepStreamBackend') @@ -32,7 +32,9 @@ class TestVisionSystem(unittest.TestCase): """ mock_backend_instance = mock_backend_class.return_value vision_system = VisionSystem(self.config) - mock_backend_class.assert_called_once_with(self.config) + expected_config = self.config.copy() + expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name + mock_backend_class.assert_called_once_with(expected_config) self.assertEqual(vision_system._backend, mock_backend_instance) @patch('platform.system', return_value='Windows') @@ -43,7 +45,9 @@ class TestVisionSystem(unittest.TestCase): """ mock_backend_instance = mock_backend_class.return_value vision_system = VisionSystem(self.config) - mock_backend_class.assert_called_once_with(self.config) + expected_config = self.config.copy() + expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name + mock_backend_class.assert_called_once_with(expected_config) self.assertEqual(vision_system._backend, mock_backend_instance) @patch('platform.system', return_value='Darwin') @@ -54,7 +58,9 @@ class TestVisionSystem(unittest.TestCase): """ mock_backend_instance = mock_backend_class.return_value vision_system = VisionSystem(self.config) - mock_backend_class.assert_called_once_with(self.config) + expected_config = self.config.copy() + expected_config.setdefault('model_name', 'yolov8n-seg.pt') # Add default model_name + mock_backend_class.assert_called_once_with(expected_config) self.assertEqual(vision_system._backend, mock_backend_instance) @patch('platform.system', return_value='UnsupportedOS') @@ -109,35 +115,6 @@ class TestVisionSystem(unittest.TestCase): vision_system.get_annotated_frame() mock_backend_instance.get_annotated_frame.assert_called_once() - @patch('vision.logging') - @patch.dict('sys.modules', {'onnxruntime': MagicMock(), 'onnxruntime-gpu': None}) - def test_python_backend_cpu_fallback(self, mock_logging): - """ - Test that PythonBackend falls back to CPU when onnxruntime-gpu is not available. - """ - mock_ort = sys.modules['onnxruntime'] - mock_ort.get_available_providers.return_value = ['CPUExecutionProvider'] - - backend = PythonBackend(self.config) - - mock_logging.warning.assert_called_with("onnxruntime-gpu is not available or CUDA is not configured. Falling back to onnxruntime (CPU).") - self.assertEqual(backend.ort, mock_ort) - - @patch('vision.logging') - @patch.dict('sys.modules', {'onnxruntime': MagicMock()}) - def test_python_backend_gpu_selection(self, mock_logging): - """ - Test that PythonBackend selects GPU when onnxruntime-gpu is available. - """ - mock_ort_gpu = MagicMock() - mock_ort_gpu.get_available_providers.return_value = ['CUDAExecutionProvider', 'CPUExecutionProvider'] - sys.modules['onnxruntime'] = mock_ort_gpu - - backend = PythonBackend(self.config) - - mock_logging.info.assert_any_call("CUDA is available. Using onnxruntime-gpu.") - self.assertEqual(backend.ort, mock_ort_gpu) - def test_mock_backend_methods(self): """ Test the methods of the MockBackend. @@ -150,5 +127,9 @@ class TestVisionSystem(unittest.TestCase): frame = backend.get_annotated_frame() self.assertIsInstance(frame, np.ndarray) -if __name__ == '__main__': - unittest.main() + def test_model_exists(self): + """ + Tests that the YOLO model file (.pt) exists at the expected location. + """ + model_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../src/controllerSoftware', self.config['model_path'])) + self.assertTrue(os.path.exists(model_path), f"YOLO model file not found at {model_path}")