Public
Snippet $168 authored by Owo Sugiana

NVIDIA Jetson Nano: Menghitung Objek

Edited
analisa_video.py
# ROI = Region of Interest
import sys
import os
from argparse import ArgumentParser
from configparser import ConfigParser
from time import time
import pyds
import cv2
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib


counted_ids = set()  # Menggunakan Set agar satu ID hanya dihitung satu kali
counter = dict()
last_counter = dict()
counter_history = dict()
registry = dict(empty_time=time())
counter_bg_colors = [
    (0.0, 0.0, 0.0, 0.5),  # R, G, B, Alpha
    (0.0, 0.0, 1.0, 0.5)]


def video_info(filename):
    cap = cv2.VideoCapture(filename)
    if not cap.isOpened():
        print("Error: Tidak dapat membuka file video.")
        cap.realease()
        sys.exit()
    width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
    height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(width)
    height = int(height)
    fps = int(fps)
    return width, height, fps


def counter_text(d):
    r = []
    for key, value in d.items():
        s = f"{key}={value}"
        r.append(s)
    return " ".join(r)


def probe_for_video(pad, info, u_data):
    def show_counter(index, data):
        text = display_meta.text_params[index]
        text.display_text = counter_text(data)
        text.x_offset, text.y_offset = 50, 50 + 50 * index
        text.font_params.font_size = 15
        text.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
        text.set_bg_clr = 1  # background true
        bg_color = counter_bg_colors[index]
        text.text_bg_clr.set(*bg_color)

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        return Gst.PadProbeReturn.OK
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    if not batch_meta:
        return Gst.PadProbeReturn.OK
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break
        if counter and registry["empty_time"] and \
                time() - registry["empty_time"] > counter_age:
            durasi_sepi = time() - registry["empty_time"]
            counter_history.clear()
            counter_history.update(counter)
            counter.clear()
            last_counter.clear()
        # Visualisasi Kotak ROI di Layar
        display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        display_meta.num_rects = 1
        rect = display_meta.rect_params[0]
        rect.left = roi_left
        rect.top = roi_top
        rect.width = roi_width
        rect.height = roi_height
        rect.border_width = 3
        rect.border_color.set(0.0, 1.0, 0.0, 1.0)  # Hijau
        # Tambahkan teks counter
        display_meta.num_labels = 2
        show_counter(0, counter)
        show_counter(1, counter_history)
        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
        # Logika Box Crossing
        l_obj = frame_meta.obj_meta_list
        if l_obj is None:
            if counter and not registry["empty_time"]:
                registry["empty_time"] = time()
        elif registry["empty_time"]:
            registry["empty_time"] = None
        while l_obj is not None:
            try:
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            obj_id = obj_meta.object_id
            label = obj_meta.obj_label
            # Objek yang sudah masuk kotak ROI diberi warna kuning
            if obj_id in counted_ids:
                obj_meta.rect_params.border_color.set(1.0, 1.0, 0.0, 1.0)
                obj_meta.text_params.font_params.font_color.set(
                    1.0, 1.0, 0.0, 1.0)
            # Koordinat objek
            o_left = obj_meta.rect_params.left
            o_top = obj_meta.rect_params.top
            o_width = obj_meta.rect_params.width
            o_height = obj_meta.rect_params.height
            o_right = o_left + o_width
            o_bottom = o_top + o_height
            # Hitung titik tengah objek
            center_x = o_left + (o_width / 2)
            center_y = o_top + (o_height / 2)
            # Cek apakah titik tengah objek masuk ke dalam roi box
            if obj_id not in counted_ids:
                if roi_left <= center_x <= roi_x_max and \
                   roi_top <= center_y <= roi_y_max:
                    if label in counter:
                        counter[label] += 1
                    else:
                        counter[label] = 1
                    counted_ids.add(obj_id)
                    print(f"{label} ID {obj_id} masuk sepenuhnya ke kotak")
            try:
                l_obj = l_obj.next
            except StopIteration:
                break
        if last_counter != counter:
            last_counter.update(counter)
            print(f"Frame {frame_meta.frame_num} {counter}")
        try:
            l_frame = l_frame.next
        except StopIteration:
            break
    return Gst.PadProbeReturn.OK


def probe_for_image(pad, info, u_data):
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        return Gst.PadProbeReturn.OK
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break
        l_obj = frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            label = obj_meta.obj_label
            if label in counter:
                counter[label] += 1
            else:
                counter[label] = 1
            try:
                l_obj = l_obj.next
            except StopIteration:
                break
        try:
            l_frame = l_frame.next
        except StopIteration:
            break
    return Gst.PadProbeReturn.OK


def bus_call(bus, message, loop):
    t = message.type
    if t == Gst.MessageType.EOS:
        print("End-of-stream\n")
        loop.quit()
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        print(f"Error: {err}: {debug}\n")
        loop.quit()
    return True


def remove_engine_if_older():
    conf = ConfigParser()
    conf.read(option.conf)
    cf = dict(conf.items('property'))
    if 'onnx-file' in cf:
        model_file = cf['onnx-file']
    elif 'model-file' in cf:
        model_file = cf['model-file']
    else:
        model_file = None
    if model_file:
        engine_file = cf['model-engine-file']
        if os.path.exists(engine_file):
            engine_time = os.path.getmtime(engine_file)
            model_time = os.path.getmtime(model_file)
            if model_time > engine_time:
                print(f"Hapus {engine_file} karena {model_file} lebih baru")
                os.remove(engine_file)


BASE_DEEPSTREAM = "/opt/nvidia/deepstream/deepstream-6.0"
DS_PATH = f"{BASE_DEEPSTREAM}/samples/configs/deepstream-app"
TRACKER_LIB = f"{BASE_DEEPSTREAM}/lib/libnvds_nvmultiobjecttracker.so"

help_roi_left = f"region of interest left"
help_roi_top = f"region of interest top"
help_roi_width = f"region of interest width"
help_roi_height = f"region of interest height"

conf_file = os.path.join(DS_PATH, "config_infer_primary_nano.txt")
help_conf = f"default {conf_file}"

conf_tracker_file = f"{DS_PATH}/config_tracker_NvDCF_accuracy.txt"
help_conf_tracker = f"default {conf_tracker_file}"

counter_age = 5  # seconds
help_age = f"default {counter_age}"

pars = ArgumentParser()
pars.add_argument("--input-video", help="default dari kamera")
pars.add_argument("--output-video", help="default ke layar")
pars.add_argument("--input-width", type=int)
pars.add_argument("--input-height", type=int)
pars.add_argument("--fps", type=int)
pars.add_argument("--roi-left", help=help_roi_left, type=int)
pars.add_argument("--roi-top", help=help_roi_top, type=int)
pars.add_argument("--roi-width", help=help_roi_width, type=int)
pars.add_argument("--roi-height", help=help_roi_height, type=int)
pars.add_argument("--conf", default=conf_file, help=help_conf)
pars.add_argument(
    "--conf-tracker", default=conf_tracker_file, help=help_conf_tracker)
pars.add_argument(
    "--counter-age", default=counter_age, help=help_age, type=int)
option = pars.parse_args(sys.argv[1:])

counter_age = option.counter_age * 5
input_video_uri = None

if option.input_video:
    if os.path.exists(option.input_video):
        input_video = os.path.realpath(option.input_video)
        input_width, input_height, fps = video_info(input_video)
        prefix, ext = os.path.splitext(option.input_video)
        if ext.lower() == '.jpg':
            pipelines = [f"filesrc location={option.input_video}"]
        else:
            input_video_uri = f"file://{input_video}"
    else:
        if option.input_width:
            input_width = option.input_width
            input_height = option.input_height
        else:
            input_width = 1280
            input_height = 720
        if option.fps:
            fps = option.fps
        else:
            fps = 15  # standar aman RTSP
        input_video_uri = option.input_video
    if input_video_uri:
        pipelines = [f"uridecodebin uri={input_video_uri}"]
    else:
        pipelines += ["jpegparse", "nvv4l2decoder"]
    pipelines += [
        "nvvideoconvert",
        "video/x-raw(memory:NVMM),format=NV12",
        ("mux.sink_0 nvstreammux name=mux batch-size=1 "
         f"width={input_width} height={input_height} "
         "batched-push-timeout=40000")]
else:
    # Standar kamera IMX219
    input_width = 3264
    input_height = 1848
    fps = 28
    pipelines = [
        "nvarguscamerasrc bufapi-version=true",
        (f"video/x-raw(memory:NVMM),"
         f"width={input_width},height={input_height},"
         f"format=NV12,framerate={option.fps}/1"),
        ("mux.sink_0 nvstreammux name=mux batch-size=1 "
         f"width={input_width} height={input_height} "
         "live-source=1")]
pipelines += [f"nvinfer config-file-path={option.conf}"]
if input_video_uri:
    pipelines += [
        ("nvtracker "
         f"tracker-width={input_width} "
         f"tracker-height={input_height} "
         f"ll-lib-file={TRACKER_LIB} "
         f"ll-config-file={option.conf_tracker}")]
pipelines += ["nvvideoconvert", "nvdsosd name=onscreendisplay"]
if option.output_video:
    pipelines += ["nvvideoconvert"]
    if input_video_uri:
        pipelines += [
            "nvv4l2h264enc bitrate=4000000",
            "h264parse",
            # "qtmux", MP4 gagal bila mati mendadak
            "matroskamux"]  # MKV tak masalah mati mendadak
    else:
        pipelines += ["jpegenc"]
    pipelines += [f"filesink location={option.output_video}"]
else:
    pipelines += [
        "nvegltransform",
        "nveglglessink sync=0"]
pipeline_str = " ! ".join(pipelines)

if option.roi_left is None:
    roi_left = 0
else:
    roi_left = option.roi_left
if option.roi_width is None:
    roi_width = input_width
else:
    roi_width = option.roi_width
if option.roi_top is None:
    roi_top = 0
else:
    roi_top = option.roi_top
if option.roi_height is None:
    roi_height = input_height
else:
    roi_height = option.roi_height

roi_x_max = roi_left + roi_width
roi_y_max = roi_top + roi_height

remove_engine_if_older()

Gst.init(None)
pipeline = Gst.parse_launch(pipeline_str)
osd_element = pipeline.get_by_name("onscreendisplay")
osd_sink_pad = osd_element.get_static_pad("sink")
probe_func = input_video_uri and probe_for_video or probe_for_image
osd_sink_pad.add_probe(Gst.PadProbeType.BUFFER, probe_func, 0)

loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
if input_video_uri:
    bus.connect("message", bus_call, loop)
bus.connect(
    "message", lambda b,
    m: loop.quit() if m.type == Gst.MessageType.EOS else None)

pipeline.set_state(Gst.State.PLAYING)
try:
    loop.run()
except KeyboardInterrupt:
    pass
pipeline.set_state(Gst.State.NULL)
print("Objects:", counter)