-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvideo_utils.py
132 lines (99 loc) · 3.6 KB
/
video_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import cv2
import os
import logging
# logging.basicConfig(level=logging.DEBUG)
from skimage import metrics as skimage_metrics
def is_similar_frame(f1, f2, resize_to=(64, 64), thresh=0.5, return_score=False):
thresh = float(os.getenv("FRAME_SIMILARITY_THRESH", thresh))
if f1 is None or f2 is None:
return False
if isinstance(f1, str) and os.path.exists(f1):
try:
f1 = cv2.imread(f1)
except Exception as ex:
logging.exception(ex, exc_info=True)
return False
if isinstance(f2, str) and os.path.exists(f2):
try:
f2 = cv2.imread(f2)
except Exception as ex:
logging.exception(ex, exc_info=True)
return False
if resize_to:
f1 = cv2.resize(f1, resize_to)
f2 = cv2.resize(f2, resize_to)
if len(f1.shape) == 3:
f1 = f1[:, :, 0]
if len(f2.shape) == 3:
f2 = f2[:, :, 0]
score = skimage_metrics.structural_similarity(f1, f2, multichannel=False)
if return_score:
return score
if score >= thresh:
return True
return False
def get_interest_frames_from_video(
video_path,
frame_similarity_threshold=0.5,
similarity_context_n_frames=3,
skip_n_frames=0.5,
output_frames_to_dir=None,
):
skip_n_frames = float(os.getenv("SKIP_N_FRAMES", skip_n_frames))
important_frames = []
fps = 0
video_length = 0
try:
video = cv2.VideoCapture(video_path)
fps = video.get(cv2.CAP_PROP_FPS)
length = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if skip_n_frames < 1:
skip_n_frames = int(skip_n_frames * fps)
logging.info(f"skip_n_frames: {skip_n_frames}")
video_length = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
for frame_i in range(length + 1):
read_flag, current_frame = video.read()
if not read_flag:
break
if skip_n_frames > 0:
if frame_i % skip_n_frames != 0:
continue
frame_i += 1
found_similar = False
for context_frame_i, context_frame in reversed(
important_frames[-1 * similarity_context_n_frames :]
):
if is_similar_frame(
context_frame, current_frame, thresh=frame_similarity_threshold
):
logging.debug(f"{frame_i} is similar to {context_frame_i}")
found_similar = True
break
if not found_similar:
logging.debug(f"{frame_i} is added to important frames")
important_frames.append((frame_i, current_frame))
if output_frames_to_dir:
if not os.path.exists(output_frames_to_dir):
os.mkdir(output_frames_to_dir)
output_frames_to_dir = output_frames_to_dir.rstrip("/")
cv2.imwrite(
f"{output_frames_to_dir}/{str(frame_i).zfill(10)}.png",
current_frame,
)
logging.info(
f"{len(important_frames)} important frames will be processed from {video_path} of length {length}"
)
except Exception as ex:
logging.exception(ex, exc_info=True)
return (
[i[0] for i in important_frames],
[i[1] for i in important_frames],
fps,
video_length,
)
if __name__ == "__main__":
import sys
imp_frames = get_interest_frames_from_video(
sys.argv[1], output_frames_to_dir="./frames/"
)
print([i[0] for i in imp_frames])