import sys import subprocess import threading import time import gc import json from flask import Flask, Response, render_template_string, jsonify # --- CONFIGURATION --- TARGET_NUM_CAMS = 3 DEFAULT_W = 1280 DEFAULT_H = 720 # --- PART 1: DETECTION (Unchanged) --- def scan_connected_cameras(): print("--- Scanning for Basler Cameras ---") detection_script = """ import sys try: from pypylon import pylon tl_factory = pylon.TlFactory.GetInstance() devices = tl_factory.EnumerateDevices() if not devices: print("NONE") else: results = [] for i in range(len(devices)): cam = pylon.InstantCamera(tl_factory.CreateDevice(devices[i])) cam.Open() serial = cam.GetDeviceInfo().GetSerialNumber() model = cam.GetDeviceInfo().GetModelName() is_color = model.endswith("c") or "Color" in model w = cam.Width.GetValue() h = cam.Height.GetValue() binning = 0 try: cam.BinningHorizontal.Value = 2 cam.BinningVertical.Value = 2 cam.BinningHorizontal.Value = 1 cam.BinningVertical.Value = 1 binning = 1 except: pass current_fmt = cam.PixelFormat.GetValue() cam.Close() results.append(f"{serial}:{w}:{h}:{binning}:{1 if is_color else 0}:{model}:{current_fmt}") print("|".join(results)) except Exception: print("NONE") """ try: result = subprocess.run([sys.executable, "-c", detection_script], capture_output=True, text=True) output = result.stdout.strip() if "NONE" in output or not output: return [] camera_list = [] entries = output.split('|') for entry in entries: parts = entry.split(':') camera_list.append({ "serial": parts[0], "width": int(parts[1]), "height": int(parts[2]), "binning": (parts[3] == '1'), "is_color": (parts[4] == '1'), "model": parts[5] }) return camera_list except: return [] DETECTED_CAMS = scan_connected_cameras() ACTUAL_CAMS_COUNT = len(DETECTED_CAMS) # --- RESOLUTION LOGIC --- if ACTUAL_CAMS_COUNT > 0: MASTER_W = DETECTED_CAMS[0]['width'] MASTER_H = DETECTED_CAMS[0]['height'] else: MASTER_W = DEFAULT_W MASTER_H = DEFAULT_H INTERNAL_WIDTH = 1280 scale = INTERNAL_WIDTH / MASTER_W INTERNAL_HEIGHT = int(MASTER_H * scale) if INTERNAL_HEIGHT % 2 != 0: INTERNAL_HEIGHT += 1 WEB_WIDTH = 1280 total_source_width = INTERNAL_WIDTH * TARGET_NUM_CAMS scale_tiled = WEB_WIDTH / total_source_width WEB_HEIGHT = int(INTERNAL_HEIGHT * scale_tiled) if WEB_HEIGHT % 2 != 0: WEB_HEIGHT += 1 print(f"LAYOUT: {TARGET_NUM_CAMS} Slots | Detected: {ACTUAL_CAMS_COUNT}") for c in DETECTED_CAMS: print(f" - Cam {c['serial']} ({c['model']}): {'COLOR' if c['is_color'] else 'MONO'}") # --- FLASK & GSTREAMER --- import gi gi.require_version('Gst', '1.0') from gi.repository import Gst, GLib app = Flask(__name__) frame_buffer = None buffer_lock = threading.Lock() current_fps = 0.0 frame_count = 0 start_time = time.time() class GStreamerPipeline(threading.Thread): def __init__(self): super().__init__() self.loop = GLib.MainLoop() self.pipeline = None def run(self): Gst.init(None) self.build_pipeline() self.pipeline.set_state(Gst.State.PLAYING) try: self.loop.run() except Exception as e: print(f"Error: {e}") finally: self.pipeline.set_state(Gst.State.NULL) def on_new_sample(self, sink): global frame_count, start_time, current_fps sample = sink.emit("pull-sample") if not sample: return Gst.FlowReturn.ERROR frame_count += 1 if frame_count % 30 == 0: elapsed = time.time() - start_time current_fps = 30 / elapsed if elapsed > 0 else 0 start_time = time.time() buffer = sample.get_buffer() success, map_info = buffer.map(Gst.MapFlags.READ) if not success: return Gst.FlowReturn.ERROR global frame_buffer with buffer_lock: frame_buffer = bytes(map_info.data) buffer.unmap(map_info) return Gst.FlowReturn.OK def build_pipeline(self): sources_str = "" for i in range(TARGET_NUM_CAMS): if i < len(DETECTED_CAMS): cam_info = DETECTED_CAMS[i] serial = cam_info['serial'] is_color = cam_info['is_color'] print(f"Slot {i}: Linking {serial} [{'Color' if is_color else 'Mono'}]") # --- 1. BASE SETTINGS (Common) --- # We DISABLE Throughput Limit to allow high bandwidth base_settings = ( f"pylonsrc device-serial-number={serial} " "cam::TriggerMode=Off " "cam::AcquisitionFrameRateEnable=true cam::AcquisitionFrameRate=60.0 " "cam::DeviceLinkThroughputLimitMode=Off " ) # Pre-scaler pre_scale = ( "nvvideoconvert compute-hw=1 ! " f"video/x-raw(memory:NVMM), format=NV12, width={INTERNAL_WIDTH}, height={INTERNAL_HEIGHT}, framerate=60/1 ! " ) if is_color: # --- 2A. COLOR SETTINGS (High Speed) --- # FIX: Force ExposureTime=20000.0 (20ms) even for Color. # If we leave it on Auto, it will slow down the Mono cameras. # We rely on 'GainAuto' to make the image bright enough. color_settings = ( f"{base_settings} " "cam::ExposureAuto=Off cam::ExposureTime=20000.0 " "cam::GainAuto=Continuous " "cam::Width=1920 cam::Height=1080 cam::OffsetX=336 cam::OffsetY=484 " "cam::PixelFormat=BayerBG8 " # Force Format ) source = ( f"{color_settings} ! " "bayer2rgb ! " # Debayer "videoconvert ! " "video/x-raw,format=RGBA ! " "nvvideoconvert compute-hw=1 ! " f"video/x-raw(memory:NVMM), format=NV12 ! " f"{pre_scale}" f"m.sink_{i} " ) else: # --- 2B. MONO SETTINGS (High Speed) --- # Force ExposureTime=20000.0 mono_settings = ( f"{base_settings} " "cam::ExposureAuto=Off cam::ExposureTime=20000.0 " "cam::GainAuto=Continuous " ) if cam_info['binning']: mono_settings += "cam::BinningHorizontal=2 cam::BinningVertical=2 " source = ( f"{mono_settings} ! " "video/x-raw,format=GRAY8 ! " "videoconvert ! " "video/x-raw,format=I420 ! " "nvvideoconvert compute-hw=1 ! " f"video/x-raw(memory:NVMM), format=NV12 ! " f"{pre_scale}" f"m.sink_{i} " ) else: # --- DISCONNECTED PLACEHOLDER --- source = ( f"videotestsrc pattern=black is-live=true ! " f"videorate ! " f"video/x-raw,width={INTERNAL_WIDTH},height={INTERNAL_HEIGHT},format=I420,framerate=60/1 ! " f"textoverlay text=\"DISCONNECTED\" valignment=center halignment=center font-desc=\"Sans, 48\" ! " "nvvideoconvert compute-hw=1 ! " f"video/x-raw(memory:NVMM),format=NV12,width={INTERNAL_WIDTH},height={INTERNAL_HEIGHT},framerate=60/1 ! " f"m.sink_{i} " ) sources_str += source # 3. MUXER & PROCESSING processing = ( f"nvstreammux name=m batch-size={TARGET_NUM_CAMS} width={INTERNAL_WIDTH} height={INTERNAL_HEIGHT} " f"live-source=1 batched-push-timeout=33000 ! " f"nvmultistreamtiler width={WEB_WIDTH} height={WEB_HEIGHT} rows=1 columns={TARGET_NUM_CAMS} ! " "nvvideoconvert compute-hw=1 ! " "video/x-raw(memory:NVMM) ! " "videorate drop-only=true ! " "video/x-raw(memory:NVMM), framerate=30/1 ! " f"nvjpegenc quality=60 ! " "appsink name=sink emit-signals=True sync=False max-buffers=1 drop=True" ) pipeline_str = f"{sources_str} {processing}" print(f"Launching Optimized Pipeline (All Cams Forced to 20ms Shutter)...") self.pipeline = Gst.parse_launch(pipeline_str) appsink = self.pipeline.get_by_name("sink") appsink.connect("new-sample", self.on_new_sample) # --- FLASK --- @app.route('/') def index(): return render_template_string('''