import threading import time import gi gi.require_version('Gst', '1.0') from gi.repository import Gst, GLib, GObject class GStreamerPipeline(threading.Thread): def __init__(self, detected_cams, target_num_cams, internal_width, internal_height, web_width, web_height): super().__init__() self.loop = GLib.MainLoop() self.pipeline = None self.target_num_cams = target_num_cams self.internal_width = internal_width self.internal_height = internal_height self.web_width = web_width self.web_height = web_height self.frame_buffers = [None] * self.target_num_cams self.buffer_locks = [threading.Lock() for _ in range(self.target_num_cams)] self.current_fps = 0.0 # Will still report overall FPS, not per stream self.frame_count = 0 self.start_time = time.time() # Sort cameras: color camera first, then mono cameras self.sorted_cams = detected_cams # We now expect detected_cams to be already sorted in app.py or be handled by the client print(f"Sorted cameras for GStreamer: {self.sorted_cams}") def run(self): Gst.init(None) self.build_pipeline() if self.pipeline: self.pipeline.set_state(Gst.State.PLAYING) try: self.loop.run() except Exception as e: print(f"Error: {e}") finally: self.pipeline.set_state(Gst.State.NULL) else: print("GStreamer pipeline failed to build.") def on_new_sample_factory(self, stream_id): def on_new_sample(sink): sample = sink.emit("pull-sample") if not sample: return Gst.FlowReturn.ERROR # Update overall FPS counter from the first stream if stream_id == 0: self.frame_count += 1 if self.frame_count % 30 == 0: elapsed = time.time() - self.start_time self.current_fps = 30 / float(elapsed) if elapsed > 0 else 0 self.start_time = time.time() buffer = sample.get_buffer() success, map_info = buffer.map(Gst.MapFlags.READ) if not success: return Gst.FlowReturn.ERROR with self.buffer_locks[stream_id]: self.frame_buffers[stream_id] = bytes(map_info.data) buffer.unmap(map_info) return Gst.FlowReturn.OK return on_new_sample def build_pipeline(self): sources_and_sinks_str = [] for i in range(self.target_num_cams): if i < len(self.sorted_cams): cam_info = self.sorted_cams[i] serial = cam_info['serial'] is_color = cam_info['is_color'] print(f"Setting up pipeline for Stream {i}: {serial} [{'Color' if is_color else 'Mono'}]") base_settings = f"pylonsrc device-serial-number={serial} " \ "cam::TriggerMode=Off " \ "cam::AcquisitionFrameRateEnable=true cam::AcquisitionFrameRate=60.0 " \ "cam::DeviceLinkThroughputLimitMode=Off " if is_color: color_settings = f"{base_settings} " \ "cam::ExposureAuto=Off cam::ExposureTime=20000.0 " \ "cam::GainAuto=Continuous " \ "cam::Width=1920 cam::Height=1080 " \ "cam::PixelFormat=BayerBG8 " source_and_sink = ( f"{color_settings} ! " "bayer2rgb ! " # Debayer "videoconvert ! " "video/x-raw,format=RGBA ! " "nvvideoconvert compute-hw=1 ! " f"video/x-raw(memory:NVMM), format=NV12, width={self.internal_width}, height={self.internal_height}, framerate=60/1 ! " f"nvjpegenc quality=60 ! " f"appsink name=sink_{i} emit-signals=True sync=False max-buffers=1 drop=True" ) else: mono_settings = f"{base_settings} " \ "cam::ExposureAuto=Off cam::ExposureTime=20000.0 " \ "cam::GainAuto=Continuous " if cam_info['binning']: mono_settings += "cam::BinningHorizontal=2 cam::BinningVertical=2 " source_and_sink = ( f"{mono_settings} ! " "video/x-raw,format=GRAY8 ! " "videoconvert ! " "video/x-raw,format=I420 ! " "nvvideoconvert compute-hw=1 ! " f"video/x-raw(memory:NVMM), format=NV12, width={self.internal_width}, height={self.internal_height}, framerate=60/1 ! " f"nvjpegenc quality=60 ! " f"appsink name=sink_{i} emit-signals=True sync=False max-buffers=1 drop=True" ) else: # Placeholder for disconnected cameras source_and_sink = ( "videotestsrc pattern=black is-live=true ! " f"videorate ! " f"video/x-raw,width={self.internal_width},height={self.internal_height},format=I420,framerate=60/1 ! " f"textoverlay text=\"DISCONNECTED\" valignment=center halignment=center font-desc=\"Sans, 48\" ! " "nvvideoconvert compute-hw=1 ! " f"video/x-raw(memory:NVMM),format=NV12,width={self.internal_width},height={self.internal_height},framerate=60/1 ! " f"nvjpegenc quality=60 ! " f"appsink name=sink_{i} emit-signals=True sync=False max-buffers=1 drop=True" ) sources_and_sinks_str.append(source_and_sink) pipeline_str = " ".join(sources_and_sinks_str) print("\n--- GStreamer Pipeline String ---") print(pipeline_str) print("---------------------------------\n") self.pipeline = Gst.parse_launch(pipeline_str) if self.pipeline is None: print("ERROR: GStreamer pipeline failed to parse. Check pipeline string for errors.") return for i in range(self.target_num_cams): appsink = self.pipeline.get_by_name(f"sink_{i}") if appsink: # Set caps on appsink to ensure it's negotiating JPEG appsink.set_property("caps", Gst.Caps.from_string("image/jpeg,width=(int)[1, 2147483647],height=(int)[1, 2147483647]")) appsink.connect("new-sample", self.on_new_sample_factory(i)) else: print(f"Error: appsink_{i} not found in pipeline.") def get_frame_by_id(self, stream_id): if 0 <= stream_id < self.target_num_cams: with self.buffer_locks[stream_id]: return self.frame_buffers[stream_id] return None def get_fps(self): return round(self.current_fps, 1)