# ROI = Region of Interest import sys import os from argparse import ArgumentParser from configparser import ConfigParser from time import time import pyds import cv2 import gi gi.require_version("Gst", "1.0") from gi.repository import Gst, GLib counted_ids = set() # Menggunakan Set agar satu ID hanya dihitung satu kali counter = dict() last_counter = dict() counter_history = dict() registry = dict(empty_time=time()) counter_bg_colors = [ (0.0, 0.0, 0.0, 0.5), # R, G, B, Alpha (0.0, 0.0, 1.0, 0.5)] def video_info(filename): cap = cv2.VideoCapture(filename) if not cap.isOpened(): print("Error: Tidak dapat membuka file video.") cap.realease() sys.exit() width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) fps = cap.get(cv2.CAP_PROP_FPS) width = int(width) height = int(height) fps = int(fps) return width, height, fps def counter_text(d): r = [] for key, value in d.items(): s = f"{key}={value}" r.append(s) return " ".join(r) def probe_for_video(pad, info, u_data): def show_counter(index, data): text = display_meta.text_params[index] text.display_text = counter_text(data) text.x_offset, text.y_offset = 50, 50 + 50 * index text.font_params.font_size = 15 text.font_params.font_color.set(1.0, 1.0, 1.0, 1.0) text.set_bg_clr = 1 # background true bg_color = counter_bg_colors[index] text.text_bg_clr.set(*bg_color) gst_buffer = info.get_buffer() if not gst_buffer: return Gst.PadProbeReturn.OK batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer)) if not batch_meta: return Gst.PadProbeReturn.OK l_frame = batch_meta.frame_meta_list while l_frame is not None: try: frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data) except StopIteration: break if counter and registry["empty_time"] and \ time() - registry["empty_time"] > counter_age: durasi_sepi = time() - registry["empty_time"] counter_history.clear() counter_history.update(counter) counter.clear() last_counter.clear() # Visualisasi Kotak ROI di Layar display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta) display_meta.num_rects = 1 rect = display_meta.rect_params[0] rect.left = roi_left rect.top = roi_top rect.width = roi_width rect.height = roi_height rect.border_width = 3 rect.border_color.set(0.0, 1.0, 0.0, 1.0) # Hijau # Tambahkan teks counter display_meta.num_labels = 2 show_counter(0, counter) show_counter(1, counter_history) pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta) # Logika Box Crossing l_obj = frame_meta.obj_meta_list if l_obj is None: if counter and not registry["empty_time"]: registry["empty_time"] = time() elif registry["empty_time"]: registry["empty_time"] = None while l_obj is not None: try: obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data) except StopIteration: break obj_id = obj_meta.object_id label = obj_meta.obj_label # Objek yang sudah masuk kotak ROI diberi warna kuning if obj_id in counted_ids: obj_meta.rect_params.border_color.set(1.0, 1.0, 0.0, 1.0) obj_meta.text_params.font_params.font_color.set( 1.0, 1.0, 0.0, 1.0) # Koordinat objek o_left = obj_meta.rect_params.left o_top = obj_meta.rect_params.top o_width = obj_meta.rect_params.width o_height = obj_meta.rect_params.height o_right = o_left + o_width o_bottom = o_top + o_height # Hitung titik tengah objek center_x = o_left + (o_width / 2) center_y = o_top + (o_height / 2) # Cek apakah titik tengah objek masuk ke dalam roi box if obj_id not in counted_ids: if roi_left <= center_x <= roi_x_max and \ roi_top <= center_y <= roi_y_max: if label in counter: counter[label] += 1 else: counter[label] = 1 counted_ids.add(obj_id) print(f"{label} ID {obj_id} masuk sepenuhnya ke kotak") try: l_obj = l_obj.next except StopIteration: break if last_counter != counter: last_counter.update(counter) print(f"Frame {frame_meta.frame_num} {counter}") try: l_frame = l_frame.next except StopIteration: break return Gst.PadProbeReturn.OK def probe_for_image(pad, info, u_data): gst_buffer = info.get_buffer() if not gst_buffer: return Gst.PadProbeReturn.OK batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer)) l_frame = batch_meta.frame_meta_list while l_frame is not None: try: frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data) except StopIteration: break l_obj = frame_meta.obj_meta_list while l_obj is not None: try: obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data) except StopIteration: break label = obj_meta.obj_label if label in counter: counter[label] += 1 else: counter[label] = 1 try: l_obj = l_obj.next except StopIteration: break try: l_frame = l_frame.next except StopIteration: break return Gst.PadProbeReturn.OK def bus_call(bus, message, loop): t = message.type if t == Gst.MessageType.EOS: print("End-of-stream\n") loop.quit() elif t == Gst.MessageType.ERROR: err, debug = message.parse_error() print(f"Error: {err}: {debug}\n") loop.quit() return True def remove_engine_if_older(): conf = ConfigParser() conf.read(option.conf) cf = dict(conf.items('property')) if 'onnx-file' in cf: model_file = cf['onnx-file'] elif 'model-file' in cf: model_file = cf['model-file'] else: model_file = None if model_file: engine_file = cf['model-engine-file'] if os.path.exists(engine_file): engine_time = os.path.getmtime(engine_file) model_time = os.path.getmtime(model_file) if model_time > engine_time: print(f"Hapus {engine_file} karena {model_file} lebih baru") os.remove(engine_file) BASE_DEEPSTREAM = "/opt/nvidia/deepstream/deepstream-6.0" DS_PATH = f"{BASE_DEEPSTREAM}/samples/configs/deepstream-app" TRACKER_LIB = f"{BASE_DEEPSTREAM}/lib/libnvds_nvmultiobjecttracker.so" help_roi_left = f"region of interest left" help_roi_top = f"region of interest top" help_roi_width = f"region of interest width" help_roi_height = f"region of interest height" conf_file = os.path.join(DS_PATH, "config_infer_primary_nano.txt") help_conf = f"default {conf_file}" conf_tracker_file = f"{DS_PATH}/config_tracker_NvDCF_accuracy.txt" help_conf_tracker = f"default {conf_tracker_file}" counter_age = 5 # seconds help_age = f"default {counter_age}" pars = ArgumentParser() pars.add_argument("--input-video", help="default dari kamera") pars.add_argument("--output-video", help="default ke layar") pars.add_argument("--input-width", type=int) pars.add_argument("--input-height", type=int) pars.add_argument("--fps", type=int) pars.add_argument("--roi-left", help=help_roi_left, type=int) pars.add_argument("--roi-top", help=help_roi_top, type=int) pars.add_argument("--roi-width", help=help_roi_width, type=int) pars.add_argument("--roi-height", help=help_roi_height, type=int) pars.add_argument("--conf", default=conf_file, help=help_conf) pars.add_argument( "--conf-tracker", default=conf_tracker_file, help=help_conf_tracker) pars.add_argument( "--counter-age", default=counter_age, help=help_age, type=int) option = pars.parse_args(sys.argv[1:]) counter_age = option.counter_age * 5 input_video_uri = None if option.input_video: if os.path.exists(option.input_video): input_video = os.path.realpath(option.input_video) input_width, input_height, fps = video_info(input_video) prefix, ext = os.path.splitext(option.input_video) if ext.lower() == '.jpg': pipelines = [f"filesrc location={option.input_video}"] else: input_video_uri = f"file://{input_video}" else: if option.input_width: input_width = option.input_width input_height = option.input_height else: input_width = 1280 input_height = 720 if option.fps: fps = option.fps else: fps = 15 # standar aman RTSP input_video_uri = option.input_video if input_video_uri: pipelines = [f"uridecodebin uri={input_video_uri}"] else: pipelines += ["jpegparse", "nvv4l2decoder"] pipelines += [ "nvvideoconvert", "video/x-raw(memory:NVMM),format=NV12", ("mux.sink_0 nvstreammux name=mux batch-size=1 " f"width={input_width} height={input_height} " "batched-push-timeout=40000")] else: # Standar kamera IMX219 input_video_uri = True # Kamera input_width = 3264 input_height = 1848 fps = 28 pipelines = [ "nvarguscamerasrc bufapi-version=true", (f"video/x-raw(memory:NVMM)," f"width={input_width},height={input_height}," f"format=NV12,framerate={fps}/1"), ("mux.sink_0 nvstreammux name=mux batch-size=1 " f"width={input_width} height={input_height} " "live-source=1")] pipelines += [f"nvinfer config-file-path={option.conf}"] if input_video_uri: pipelines += [ ("nvtracker " f"tracker-width={input_width} " f"tracker-height={input_height} " f"ll-lib-file={TRACKER_LIB} " f"ll-config-file={option.conf_tracker}")] pipelines += ["nvvideoconvert", "nvdsosd name=onscreendisplay"] if option.output_video: pipelines += ["nvvideoconvert"] if input_video_uri: pipelines += [ "nvv4l2h264enc bitrate=4000000", "h264parse", # "qtmux", MP4 gagal bila mati mendadak "matroskamux"] # MKV tak masalah mati mendadak else: pipelines += ["jpegenc"] pipelines += [f"filesink location={option.output_video}"] else: pipelines += [ "nvegltransform", "nveglglessink sync=0"] pipeline_str = " ! ".join(pipelines) if option.roi_left is None: roi_left = 0 else: roi_left = option.roi_left if option.roi_width is None: roi_width = input_width else: roi_width = option.roi_width if option.roi_top is None: roi_top = 0 else: roi_top = option.roi_top if option.roi_height is None: roi_height = input_height else: roi_height = option.roi_height roi_x_max = roi_left + roi_width roi_y_max = roi_top + roi_height remove_engine_if_older() Gst.init(None) pipeline = Gst.parse_launch(pipeline_str) osd_element = pipeline.get_by_name("onscreendisplay") osd_sink_pad = osd_element.get_static_pad("sink") probe_func = input_video_uri and probe_for_video or probe_for_image osd_sink_pad.add_probe(Gst.PadProbeType.BUFFER, probe_func, 0) loop = GLib.MainLoop() bus = pipeline.get_bus() bus.add_signal_watch() bus.connect("message", bus_call, loop) pipeline.set_state(Gst.State.PLAYING) try: loop.run() except KeyboardInterrupt: pass pipeline.set_state(Gst.State.NULL) print("Objects:", counter)