1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# ROI = Region of Interest
import sys
import os
from argparse import ArgumentParser
from configparser import ConfigParser
from time import time
import pyds
import cv2
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
counted_ids = set() # Menggunakan Set agar satu ID hanya dihitung satu kali
counter = dict()
last_counter = dict()
counter_history = dict()
registry = dict(empty_time=time())
counter_bg_colors = [
(0.0, 0.0, 0.0, 0.5), # R, G, B, Alpha
(0.0, 0.0, 1.0, 0.5)]
def video_info(filename):
cap = cv2.VideoCapture(filename)
if not cap.isOpened():
print("Error: Tidak dapat membuka file video.")
cap.realease()
sys.exit()
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(width)
height = int(height)
fps = int(fps)
return width, height, fps
def counter_text(d):
r = []
for key, value in d.items():
s = f"{key}={value}"
r.append(s)
return " ".join(r)
def probe_for_video(pad, info, u_data):
def show_counter(index, data):
text = display_meta.text_params[index]
text.display_text = counter_text(data)
text.x_offset, text.y_offset = 50, 50 + 50 * index
text.font_params.font_size = 15
text.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
text.set_bg_clr = 1 # background true
bg_color = counter_bg_colors[index]
text.text_bg_clr.set(*bg_color)
gst_buffer = info.get_buffer()
if not gst_buffer:
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
if counter and registry["empty_time"] and \
time() - registry["empty_time"] > counter_age:
durasi_sepi = time() - registry["empty_time"]
counter_history.clear()
counter_history.update(counter)
counter.clear()
last_counter.clear()
# Visualisasi Kotak ROI di Layar
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_rects = 1
rect = display_meta.rect_params[0]
rect.left = roi_left
rect.top = roi_top
rect.width = roi_width
rect.height = roi_height
rect.border_width = 3
rect.border_color.set(0.0, 1.0, 0.0, 1.0) # Hijau
# Tambahkan teks counter
display_meta.num_labels = 2
show_counter(0, counter)
show_counter(1, counter_history)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
# Logika Box Crossing
l_obj = frame_meta.obj_meta_list
if l_obj is None:
if counter and not registry["empty_time"]:
registry["empty_time"] = time()
elif registry["empty_time"]:
registry["empty_time"] = None
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
obj_id = obj_meta.object_id
label = obj_meta.obj_label
# Objek yang sudah masuk kotak ROI diberi warna kuning
if obj_id in counted_ids:
obj_meta.rect_params.border_color.set(1.0, 1.0, 0.0, 1.0)
obj_meta.text_params.font_params.font_color.set(
1.0, 1.0, 0.0, 1.0)
# Koordinat objek
o_left = obj_meta.rect_params.left
o_top = obj_meta.rect_params.top
o_width = obj_meta.rect_params.width
o_height = obj_meta.rect_params.height
o_right = o_left + o_width
o_bottom = o_top + o_height
# Hitung titik tengah objek
center_x = o_left + (o_width / 2)
center_y = o_top + (o_height / 2)
# Cek apakah titik tengah objek masuk ke dalam roi box
if obj_id not in counted_ids:
if roi_left <= center_x <= roi_x_max and \
roi_top <= center_y <= roi_y_max:
if label in counter:
counter[label] += 1
else:
counter[label] = 1
counted_ids.add(obj_id)
print(f"{label} ID {obj_id} masuk sepenuhnya ke kotak")
try:
l_obj = l_obj.next
except StopIteration:
break
if last_counter != counter:
last_counter.update(counter)
print(f"Frame {frame_meta.frame_num} {counter}")
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def probe_for_image(pad, info, u_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
label = obj_meta.obj_label
if label in counter:
counter[label] += 1
else:
counter[label] = 1
try:
l_obj = l_obj.next
except StopIteration:
break
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print("End-of-stream\n")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error: {err}: {debug}\n")
loop.quit()
return True
def remove_engine_if_older():
conf = ConfigParser()
conf.read(option.conf)
cf = dict(conf.items('property'))
if 'onnx-file' in cf:
model_file = cf['onnx-file']
elif 'model-file' in cf:
model_file = cf['model-file']
else:
model_file = None
if model_file:
engine_file = cf['model-engine-file']
if os.path.exists(engine_file):
engine_time = os.path.getmtime(engine_file)
model_time = os.path.getmtime(model_file)
if model_time > engine_time:
print(f"Hapus {engine_file} karena {model_file} lebih baru")
os.remove(engine_file)
BASE_DEEPSTREAM = "/opt/nvidia/deepstream/deepstream-6.0"
DS_PATH = f"{BASE_DEEPSTREAM}/samples/configs/deepstream-app"
TRACKER_LIB = f"{BASE_DEEPSTREAM}/lib/libnvds_nvmultiobjecttracker.so"
help_roi_left = f"region of interest left"
help_roi_top = f"region of interest top"
help_roi_width = f"region of interest width"
help_roi_height = f"region of interest height"
conf_file = os.path.join(DS_PATH, "config_infer_primary_nano.txt")
help_conf = f"default {conf_file}"
conf_tracker_file = f"{DS_PATH}/config_tracker_NvDCF_accuracy.txt"
help_conf_tracker = f"default {conf_tracker_file}"
counter_age = 5 # seconds
help_age = f"default {counter_age}"
pars = ArgumentParser()
pars.add_argument("--input-video", help="default dari kamera")
pars.add_argument("--output-video", help="default ke layar")
pars.add_argument("--input-width", type=int)
pars.add_argument("--input-height", type=int)
pars.add_argument("--fps", type=int)
pars.add_argument("--roi-left", help=help_roi_left, type=int)
pars.add_argument("--roi-top", help=help_roi_top, type=int)
pars.add_argument("--roi-width", help=help_roi_width, type=int)
pars.add_argument("--roi-height", help=help_roi_height, type=int)
pars.add_argument("--conf", default=conf_file, help=help_conf)
pars.add_argument(
"--conf-tracker", default=conf_tracker_file, help=help_conf_tracker)
pars.add_argument(
"--counter-age", default=counter_age, help=help_age, type=int)
option = pars.parse_args(sys.argv[1:])
counter_age = option.counter_age * 5
input_video_uri = None
if option.input_video:
if os.path.exists(option.input_video):
input_video = os.path.realpath(option.input_video)
input_width, input_height, fps = video_info(input_video)
prefix, ext = os.path.splitext(option.input_video)
if ext.lower() == '.jpg':
pipelines = [f"filesrc location={option.input_video}"]
else:
input_video_uri = f"file://{input_video}"
else:
if option.input_width:
input_width = option.input_width
input_height = option.input_height
else:
input_width = 1280
input_height = 720
if option.fps:
fps = option.fps
else:
fps = 15 # standar aman RTSP
input_video_uri = option.input_video
if input_video_uri:
pipelines = [f"uridecodebin uri={input_video_uri}"]
else:
pipelines += ["jpegparse", "nvv4l2decoder"]
pipelines += [
"nvvideoconvert",
"video/x-raw(memory:NVMM),format=NV12",
("mux.sink_0 nvstreammux name=mux batch-size=1 "
f"width={input_width} height={input_height} "
"batched-push-timeout=40000")]
else:
# Standar kamera IMX219
input_width = 3264
input_height = 1848
fps = 28
pipelines = [
"nvarguscamerasrc bufapi-version=true",
(f"video/x-raw(memory:NVMM),"
f"width={input_width},height={input_height},"
f"format=NV12,framerate={option.fps}/1"),
("mux.sink_0 nvstreammux name=mux batch-size=1 "
f"width={input_width} height={input_height} "
"live-source=1")]
pipelines += [f"nvinfer config-file-path={option.conf}"]
if input_video_uri:
pipelines += [
("nvtracker "
f"tracker-width={input_width} "
f"tracker-height={input_height} "
f"ll-lib-file={TRACKER_LIB} "
f"ll-config-file={option.conf_tracker}")]
pipelines += ["nvvideoconvert", "nvdsosd name=onscreendisplay"]
if option.output_video:
pipelines += ["nvvideoconvert"]
if input_video_uri:
pipelines += [
"nvv4l2h264enc bitrate=4000000",
"h264parse",
# "qtmux", MP4 gagal bila mati mendadak
"matroskamux"] # MKV tak masalah mati mendadak
else:
pipelines += ["jpegenc"]
pipelines += [f"filesink location={option.output_video}"]
else:
pipelines += [
"nvegltransform",
"nveglglessink sync=0"]
pipeline_str = " ! ".join(pipelines)
if option.roi_left is None:
roi_left = 0
else:
roi_left = option.roi_left
if option.roi_width is None:
roi_width = input_width
else:
roi_width = option.roi_width
if option.roi_top is None:
roi_top = 0
else:
roi_top = option.roi_top
if option.roi_height is None:
roi_height = input_height
else:
roi_height = option.roi_height
roi_x_max = roi_left + roi_width
roi_y_max = roi_top + roi_height
remove_engine_if_older()
Gst.init(None)
pipeline = Gst.parse_launch(pipeline_str)
osd_element = pipeline.get_by_name("onscreendisplay")
osd_sink_pad = osd_element.get_static_pad("sink")
probe_func = input_video_uri and probe_for_video or probe_for_image
osd_sink_pad.add_probe(Gst.PadProbeType.BUFFER, probe_func, 0)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
if input_video_uri:
bus.connect("message", bus_call, loop)
bus.connect(
"message", lambda b,
m: loop.quit() if m.type == Gst.MessageType.EOS else None)
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except KeyboardInterrupt:
pass
pipeline.set_state(Gst.State.NULL)
print("Objects:", counter)