-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmain_stream_optimized.py
118 lines (100 loc) · 4.13 KB
/
main_stream_optimized.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from time import sleep, time
from multiprocessing import Process, Queue
from queue import Full as queue_is_full
import hydra
from tqdm import tqdm
from nodes.VideoReader import VideoReader
from nodes.ShowNode import ShowNode
from nodes.VideoSaverNode import VideoSaverNode
from nodes.DetectionTrackingNodes import DetectionTrackingNodes
from nodes.TrackerInfoUpdateNode import TrackerInfoUpdateNode
from nodes.CalcStatisticsNode import CalcStatisticsNode
from nodes.FlaskServerVideoNode import VideoServer
from nodes.KafkaProducerNode import KafkaProducerNode
from elements.VideoEndBreakElement import VideoEndBreakElement
from utils_local.utils import check_and_set_env_var
PRINT_PROFILE_INFO = False
def proc_frame_reader(queue_out: Queue, config: dict, time_sleep_start: int):
sleep_message = f"Система разогревается.. sleep({time_sleep_start})"
for _ in tqdm(range(time_sleep_start), desc=sleep_message):
sleep(1)
video_reader = VideoReader(config["video_reader"])
for frame_element in video_reader.process():
ts0 = time()
try:
queue_out.put_nowait(frame_element)
#sleep(0.25)
if PRINT_PROFILE_INFO:
print(f"PROC_FRAME_READER: {(time()-ts0) * 1000:.0f} ms: ")
except queue_is_full:
if PRINT_PROFILE_INFO:
print("queue_is_full => pass frame")
if isinstance(frame_element, VideoEndBreakElement):
break
def proc_proceessor(queue_in: Queue, config: dict):
detection_node = DetectionTrackingNodes(config)
tracker_info_update_node = TrackerInfoUpdateNode(config)
calc_statistics_node = CalcStatisticsNode(config)
send_info_kafka = config["pipeline"]["send_info_kafka"]
if send_info_kafka:
kafka_producer_node = KafkaProducerNode(config)
show_node = ShowNode(config)
save_video = config["pipeline"]["save_video"]
show_in_web = config["pipeline"]["show_in_web"]
if save_video:
video_saver_node = VideoSaverNode(config["video_saver_node"])
if show_in_web:
video_server_node = VideoServer(config)
while True:
ts0 = time()
frame_element = queue_in.get()
ts1 = time()
frame_element = detection_node.process(frame_element)
frame_element = tracker_info_update_node.process(frame_element)
frame_element = calc_statistics_node.process(frame_element)
if send_info_kafka:
frame_element = kafka_producer_node.process(frame_element)
frame_element = show_node.process(frame_element)
if save_video:
video_saver_node.process(frame_element)
if show_in_web:
video_server_node.process(frame_element)
if PRINT_PROFILE_INFO:
print(
f"PROC_PROCESSOR: {(time()-ts0) * 1000:.0f} ms: "
+ f"get {(ts1-ts0) * 1000:.0f} | "
+ f"nodes_inference {(time()-ts1) * 1000:.0f} | "
)
if isinstance(frame_element, VideoEndBreakElement):
break
@hydra.main(version_base=None, config_path="configs", config_name="app_config")
def main(config) -> None:
time_sleep_start = 5
queue_frame = Queue(maxsize=2)
processes = [
Process(
target=proc_frame_reader,
args=(queue_frame, config, time_sleep_start),
name="proc_frame_reader",
),
Process(
target=proc_proceessor,
args=(queue_frame, config),
name="proc_proceessor",
daemon=True
),
]
for p in processes:
p.daemon = True
p.start()
# Ждем, пока первый процесс завершится
processes[0].join()
if __name__ == "__main__":
# Проверяем и устанавливаем переменные окружения если их нет
check_and_set_env_var("VIDEO_SRC", "test_videos/test_video.mp4")
check_and_set_env_var("ROADS_JSON", "configs/entry_exit_lanes.json")
check_and_set_env_var("TOPIC_NAME", "statistics_1")
check_and_set_env_var("CAMERA_ID", 1)
ts = time()
main()
print(f"\n total time: {(time()-ts) / 60:.2} minute")