import sys import platform import os import numpy as np import cv2 import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class VisionSystem: """ The main class for the vision system, responsible for pupil segmentation. It uses a platform-specific backend for the actual implementation. """ def __init__(self, config): self.config = config self._backend = self._initialize_backend() def _initialize_backend(self): """ Initializes the appropriate backend based on the environment and OS. """ # If in a test environment, use the MockBackend if os.environ.get("PUPILOMETER_ENV") == "test": logging.info("PUPILOMETER_ENV is set to 'test'. Initializing Mock backend.") return MockBackend(self.config) os_name = platform.system() if os_name == "Linux" or os_name == "Windows": logging.info(f"Operating system is {os_name}. Attempting to initialize DeepStream backend.") try: import gi gi.require_version('Gst', '1.0') from gi.repository import Gst Gst.init(None) logging.info("DeepStream (GStreamer) is available.") return DeepStreamBackend(self.config) except (ImportError, ValueError) as e: logging.warning(f"Could not initialize DeepStreamBackend: {e}. Falling back to PythonBackend.") return PythonBackend(self.config) elif os_name == "Darwin": logging.info("Operating system is macOS. Initializing Python backend.") return PythonBackend(self.config) else: logging.error(f"Unsupported operating system: {os_name}") raise NotImplementedError(f"Unsupported operating system: {os_name}") def start(self): """ Starts the vision system. """ self._backend.start() def stop(self): """ Stops the vision system. """ self._backend.stop() def get_pupil_data(self): """ Returns the latest pupil segmentation data. """ return self._backend.get_pupil_data() def get_annotated_frame(self): """ Returns the latest annotated frame. """ return self._backend.get_annotated_frame() class MockBackend: """ A mock backend for testing purposes. """ def __init__(self, config): self.config = config logging.info("MockBackend initialized.") def start(self): logging.info("MockBackend started.") pass def stop(self): logging.info("MockBackend stopped.") pass def get_pupil_data(self): logging.info("Getting pupil data from MockBackend.") return { "pupil_position": (123, 456), "pupil_diameter": 789, "info": "mock_data" } def get_annotated_frame(self): """ Returns a placeholder image. """ frame = np.zeros((480, 640, 3), np.uint8) cv2.putText(frame, "Mock Camera Feed", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) return frame class DeepStreamBackend: """ A class to handle pupil segmentation on Jetson/Windows using DeepStream. """ def __init__(self, config): """ Initializes the DeepStreamBackend. Args: config (dict): A dictionary containing configuration parameters. """ from deepstream_pipeline import DeepStreamPipeline self.config = config self.pipeline = DeepStreamPipeline(config) logging.info("DeepStreamBackend initialized.") def start(self): """ Starts the DeepStream pipeline. """ self.pipeline.start() logging.info("DeepStreamBackend started.") def stop(self): """ Stops the DeepStream pipeline. """ self.pipeline.stop() logging.info("DeepStreamBackend stopped.") def get_pupil_data(self): """ Retrieves pupil data from the DeepStream pipeline. """ return self.pipeline.get_data() def get_annotated_frame(self): """ Retrieves the annotated frame from the DeepStream pipeline. """ return self.pipeline.get_annotated_frame() class PythonBackend: """ A class to handle pupil segmentation on macOS using pypylon and ONNX Runtime. """ def __init__(self, config): """ Initializes the PythonBackend. Args: config (dict): A dictionary containing configuration parameters such as 'model_path'. """ self.config = config self.camera = None self.inference_session = None self.annotated_frame = None try: import onnxruntime as ort if 'CUDAExecutionProvider' in ort.get_available_providers(): logging.info("CUDA is available. Using onnxruntime-gpu.") self.ort = ort else: raise ImportError("CUDAExecutionProvider not found.") except ImportError: logging.warning("onnxruntime-gpu is not available or CUDA is not configured. Falling back to onnxruntime (CPU).") import onnxruntime as ort self.ort = ort logging.info("PythonBackend initialized.") def start(self): """ Initializes the Basler camera and loads the ONNX model. """ try: from pypylon import pylon except ImportError: raise ImportError("pypylon is not installed. Cannot start PythonBackend.") try: # Initialize the camera self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice()) self.camera.Open() # Start grabbing continuously self.camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly) logging.info("PythonBackend: Basler camera opened and started grabbing.") except Exception as e: logging.error(f"PythonBackend: Error opening Basler camera: {e}") self.camera = None try: # Load the ONNX model self.inference_session = self.ort.InferenceSession(self.config['model_path']) logging.info(f"PythonBackend: ONNX model loaded from {self.config['model_path']}.") except Exception as e: logging.error(f"PythonBackend: Error loading ONNX model: {e}") self.inference_session = None logging.info("PythonBackend started.") def stop(self): """ Releases the camera resources. """ if self.camera and self.camera.IsGrabbing(): self.camera.StopGrabbing() logging.info("PythonBackend: Basler camera stopped grabbing.") if self.camera and self.camera.IsOpen(): self.camera.Close() logging.info("PythonBackend: Basler camera closed.") logging.info("PythonBackend stopped.") def _postprocess_output(self, outputs, original_image_shape): """ Post-processes the raw output from the YOLOv10 model. Args: outputs (list): A list of numpy arrays representing the model's output. original_image_shape (tuple): The shape of the original image (height, width). Returns: dict: A dictionary containing the processed pupil data. """ # TODO: Implement the actual post-processing logic. # This will involve non-maximum suppression (NMS) and parsing the # bounding boxes and segmentation masks. logging.info("Post-processing model output...") pupil_data = { "raw_model_output_shape": [o.shape for o in outputs], "pupil_position": (100, 120), # Placeholder "pupil_diameter": 30, # Placeholder "bounding_box": [50, 70, 150, 170] # Placeholder [x1, y1, x2, y2] } return pupil_data def get_pupil_data(self): """ Grabs a frame from the camera, runs inference, and returns pupil data. """ if not self.camera or not self.camera.IsGrabbing(): logging.warning("PythonBackend: Camera not ready.") return None if not self.inference_session: logging.warning("PythonBackend: Inference session not ready.") return None grab_result = None try: import cv2 import numpy as np from pypylon import pylon grab_result = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException) if grab_result.GrabSucceeded(): image = grab_result.Array original_shape = image.shape # Image preprocessing if len(image.shape) == 2: image = cv2.cvtColor(image, cv2.COLOR_BAYER_BG2RGB) input_shape = (640, 640) resized_image = cv2.resize(image, input_shape) normalized_image = resized_image.astype(np.float32) / 255.0 transposed_image = np.transpose(normalized_image, (2, 0, 1)) input_tensor = np.expand_dims(transposed_image, axis=0) # Run inference input_name = self.inference_session.get_inputs()[0].name output_names = [o.name for o in self.inference_session.get_outputs()] outputs = self.inference_session.run(output_names, {input_name: input_tensor}) # Post-process the output pupil_data = self._postprocess_output(outputs, original_shape) # Draw segmentation on the frame annotated_frame = image.copy() if pupil_data and "bounding_box" in pupil_data: x1, y1, x2, y2 = pupil_data["bounding_box"] cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 2) self.annotated_frame = annotated_frame return pupil_data else: logging.error(f"PythonBackend: Error grabbing frame: {grab_result.ErrorCode} {grab_result.ErrorDescription}") return None except Exception as e: logging.error(f"PythonBackend: An error occurred during frame grabbing or inference: {e}") return None finally: if grab_result: grab_result.Release() def get_annotated_frame(self): """ Returns the latest annotated frame. """ return self.annotated_frame if __name__ == '__main__': # Example usage config = {"camera_id": 0, "model_path": "yolov10.onnx"} try: vision_system = VisionSystem(config) vision_system.start() # In a real application, this would run in a loop pupil_data = vision_system.get_pupil_data() logging.info(f"Received pupil data: {pupil_data}") # Get and show the annotated frame annotated_frame = vision_system.get_annotated_frame() if annotated_frame is not None: cv2.imshow("Annotated Frame", annotated_frame) cv2.waitKey(0) cv2.destroyAllWindows() vision_system.stop() except NotImplementedError as e: logging.error(e) except Exception as e: logging.error(f"An error occurred: {e}")