25
25
DASH_ROOT = "/var/www/video/dash"
26
26
HLS_ROOT = "/var/www/video/hls"
27
27
MP4_ROOT = "/var/www/video/mp4"
28
+ CACHE_RAW_YUV_ROOT = "/var/www/video/rawyuv"
29
+ CACHE_DECODED_YUV_ROOT = "/var/www/video/decodedyuv"
28
30
29
31
HW_ACC_TYPE = os .getenv ("HW_ACC_TYPE" ,"sw" )
30
32
HW_DEVICE = os .getenv ("HW_DEVICE" ,None )
33
+ SCENARIO = os .getenv ("SCENARIO" ,"transcode" )
31
34
32
35
fps_regex = re .compile (
33
36
r"\s*frame=\s*(?P<frame_count>\d+)\s*fps=\s*(?P<fps>\d+\.?\d*).*"
@@ -54,7 +57,7 @@ def get_fps(next_line,start_time):
54
57
return {"fps" :round (fps ,1 ), "speed" :round (speed ,3 ), "frames" :frame_count , "start" :round (start_time ,3 ), "duration" :round (now - start_time ,3 ), "end" :round (now ,3 ), "status" : "active" }
55
58
return {}
56
59
57
- def execute (idx , name , cmd ):
60
+ def execute (idx , name , cmd , kafka = True ):
58
61
p = subprocess .Popen (cmd , stdout = subprocess .PIPE , stderr = subprocess .PIPE , bufsize = 1 , universal_newlines = True )
59
62
p .poll ()
60
63
start_time = time .time ()
@@ -84,6 +87,46 @@ def execute(idx, name, cmd):
84
87
print ("Exception: {}" .format (e ))
85
88
return p .returncode
86
89
90
+ def decode_yuv (yuv_path ,in_stream_name ,nframes = None ):
91
+ try :
92
+ yuv_name = yuv_path + "/" + in_stream_name .split ("/" )[- 1 ].replace (".mp4" ,".yuv" )
93
+ if not os .path .exists (path ): makedirs (yuv_path )
94
+ if os .path .exists (yuv_name ): return
95
+ if nframes :
96
+ cmd = ["ffmpeg" , "-hide_banner" , "-i" ,in_stream_name , "-vcodec rawvideo" , "-an" ,"-frames:v" , str (nframes ),"-y" ,yuv_name ]
97
+ else :
98
+ cmd = ["ffmpeg" , "-hide_banner" , "-i" ,in_stream_name , "-vcodec rawvideo" , "-an" ,"-y" ,yuv_name ]
99
+ print (cmd , flush = True )
100
+ execute (5001 ,"decoded" ,cmd ,kafka = False )
101
+ except Exception as e :
102
+ print ("Exception: {}" .format (e ))
103
+ pass
104
+ return yuv_name
105
+
106
+ def measure_quality_vamf (idx ,name , raw_mp4_path , target_mp4_path ,width ,height , nframes = 100 ):
107
+ vmaf_score = None
108
+ model_path = "/home/models/"
109
+ try :
110
+ if width >= 1920 and height >= 1080 :
111
+ model_name = model_path + "vmaf_4k_v0.6.1.json"
112
+ else :
113
+ model_name = model_path + "vmaf_v0.6.1.json"
114
+ log_path = target_mp4_path + ".json"
115
+ framerate = 24
116
+ cmd = ["ffmpeg" , "-r" , str (framerate ), "-i" ,raw_mp4_path , "-r" , str (framerate ), "-i" , target_mp4_path , "-lavfi" , "[0:v]trim=end_frame={},scale={}:{}:flags=bicubic,setpts=PTS-STARTPTS[reference];[1:v]trim=end_frame={},setpts=PTS-STARTPTS[distorted];[distorted][reference]libvmaf=log_fmt=json:log_path={}:model_path={}" .format (nframes ,width ,height ,nframes ,log_path ,model_name ),"-f" , "null" , "-" ]
117
+ print (cmd ,flush = True )
118
+ execute (str (idx + 1000 ),name ,cmd ,kafka = False )
119
+ with open (log_path ) as f :
120
+ obj = json .load (f )
121
+ sinfo = {"id" : str (idx + 1000 ), "stream" :name }
122
+ vmaf_score = float (obj ["pooled_metrics" ]["vmaf" ]["mean" ])
123
+ sinfo .update ({"vmaf" :vmaf_score })
124
+ producer .send (KAFKA_WORKLOAD_TOPIC , json .dumps (sinfo ))
125
+
126
+ except Exception as e :
127
+ print ("Exception: {}" .format (e ))
128
+ return vmaf_score
129
+
87
130
def process_stream_vods (msg ):
88
131
stream_name = msg ["name" ]
89
132
stream_type = msg ["output" ]["type" ]
@@ -110,7 +153,8 @@ def process_stream_vods(msg):
110
153
111
154
if zk .process_start ():
112
155
try :
113
- cmd = FFMpegCmd (ARCHIVE_ROOT + "/" + stream_name , target_root + "/" + stream_name , stream_type , params = stream_parameters , acc_type = HW_ACC_TYPE , loop = loop , device = HW_DEVICE ).cmd ()
156
+ input_stream = ARCHIVE_ROOT + "/" + stream_name
157
+ cmd = FFMpegCmd (input_stream , target_root + "/" + stream_name , stream_type , params = stream_parameters , acc_type = HW_ACC_TYPE , loop = loop , device = HW_DEVICE ).cmd ()
114
158
if cmd :
115
159
print (cmd , flush = True )
116
160
r = execute (idx , stream_name , cmd )
@@ -130,7 +174,7 @@ def process_stream_lives(msg):
130
174
stream_type = msg ["output" ]["type" ]
131
175
target = msg ["output" ]["target" ]
132
176
loop = msg ["loop" ]
133
- idx = msg ["idx" ] if "idx" in msg .keys () else int (random .random ()* 10000 )
177
+ idx = int ( msg ["idx" ]) if "idx" in msg .keys () else int (random .random ()* 10000 )
134
178
stream = stream_type + "/" + stream_name
135
179
136
180
if not isfile (ARCHIVE_ROOT + "/" + stream_name ):
@@ -156,13 +200,19 @@ def process_stream_lives(msg):
156
200
157
201
if zk .process_start ():
158
202
try :
159
- cmd = FFMpegCmd (ARCHIVE_ROOT + "/" + stream_name , target_name , stream_type , params = stream_parameters , acc_type = HW_ACC_TYPE , loop = loop , device = HW_DEVICE ).cmd ()
203
+ input_stream = ARCHIVE_ROOT + "/" + stream_name
204
+ cmd = FFMpegCmd (input_stream , target_name , stream_type , params = stream_parameters , acc_type = HW_ACC_TYPE , loop = loop , device = HW_DEVICE ).cmd ()
160
205
161
206
if cmd :
162
207
print (cmd , flush = True )
163
208
r = execute (idx , stream_name , cmd )
164
209
if r :
165
210
raise Exception ("status code: " + str (r ))
211
+ if SCENARIO == "encode" :
212
+ width = stream_parameters ["renditions" ][0 ][0 ]
213
+ height = stream_parameters ["renditions" ][0 ][1 ]
214
+ mp4_file = cmd [- 1 ]
215
+ measure_quality_vamf (idx ,stream_name ,raw_mp4_path = input_stream ,target_mp4_path = mp4_file ,width = width ,height = height )
166
216
zk .process_end ()
167
217
except :
168
218
print (traceback .format_exc (), flush = True )
0 commit comments