Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding utils for handling VideoDocument #233

Merged
merged 16 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ $(artifact):
test: devversion $(generatedcode)
pip install --upgrade -r requirements.dev
pip install -r requirements.txt
pip install -r requirements.cv
pytype $(packagename)
python3 -m pytest --doctest-modules --cov=$(packagename) --cov-report=xml

Expand Down
7 changes: 7 additions & 0 deletions mmif/serialize/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ def get(self, prop_name: str) -> Union['AnnotationProperties', JSON_PRMTV_TYPES,
def __getitem__(self, prop_name: str):
return self.get(prop_name)

def __contains__(self, item):
try:
self.get(item)
return True
except KeyError:
return False

def is_document(self):
return isinstance(self.at_type, DocumentTypesBase)

Expand Down
1 change: 1 addition & 0 deletions mmif/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from mmif.utils import video_document_helper
190 changes: 190 additions & 0 deletions mmif/utils/video_document_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
from typing import List, Union, Tuple

import numpy as np
from PIL import Image

from mmif import Annotation, Document, Mmif
from mmif.vocabulary import DocumentTypes

try:
import cv2
import ffmpeg
import PIL
except ImportError as e:
raise ImportError(
f"Optional package {e.name} not found. You might want to install Computer-Vision dependencies by running `pip install mmif-python[cv]`")

FPS_DOCPROP_KEY = 'fps'
UNIT_NORMALIZATION = {
'ms': 'millisecond',
'msec': 'millisecond',
'millisecond': 'millisecond',
'milliseconds': 'millisecond',
's': 'second',
'sec': 'second',
'second': 'second',
'seconds': 'second',
'frame': 'frame',
'f': 'frame',
}


def capture(vd: Document) -> cv2.VideoCapture:
if vd is None or vd.at_type != DocumentTypes.VideoDocument:
raise ValueError(f'The document does not exist.')

v = cv2.VideoCapture(vd.location_path())
vd.add_property(FPS_DOCPROP_KEY, v.get(cv2.CAP_PROP_FPS))
return v


def get_framerate(vd: Document) -> float:
if vd is None or vd.at_type != DocumentTypes.VideoDocument:
raise ValueError(f'The document does not exist.')

framerate_keys = (FPS_DOCPROP_KEY, 'framerate')
for k in framerate_keys:
if k in vd:
fps = vd.get_property(k)
return fps
capture(vd)
return vd.get_property(FPS_DOCPROP_KEY)


def extract_frames_as_images(vd: Document, framenums: List[int], as_PIL: bool = False) -> List[Union[np.ndarray, PIL.Image.Image]]:
"""
Extracts frames from a video document as a list of numpy arrays.
Use `sample_frames` function in this module to get the list of frame numbers first.

:param vd: VideoDocument object that holds the video file location
:param framenums: integers representing the frame numbers to extract
:param as_PIL: use PIL.Image instead of numpy.ndarray
:return: frames as a list of numpy arrays or PIL.Image objects
"""
frames = []
video = capture(vd)
for framenum in framenums:
video.set(cv2.CAP_PROP_POS_FRAMES, framenum)
ret, frame = video.read()
if ret:
frames.append(Image.fromarray(frame[:, :, ::-1]) if as_PIL else frame)
else:
break
return frames


def extract_mid_frame(mmif: Mmif, tf: Annotation, as_PIL: bool = False) -> Union[np.ndarray, PIL.Image.Image]:
"""
Extracts the middle frame from a video document
"""
timeunit = get_annotation_property(mmif, tf, 'timeUnit')
vd = mmif[get_annotation_property(mmif, tf, 'document')]
fps = get_framerate(vd)
midframe = sum(convert(float(tf.get_property(timepoint_propkey)), timeunit, 'frame', fps) for timepoint_propkey in ('start', 'end')) // 2
return extract_frames_as_images(vd, [midframe], as_PIL=as_PIL)[0]


def sample_frames(start_frame: int, end_frame: int, sample_ratio: int = 1) -> List[int]:
"""
Helper function to sample frames from a time interval.
When start_frame is 0 and end_frame is X, this function basically works as "cutoff".

:param start_frame: start frame of the interval
:param end_frame: end frame of the interval
:param sample_ratio: sample ratio or sample step, default is 1, meaning all consecutive frames are sampled
"""
sample_ratio = int(sample_ratio)
if sample_ratio < 1:
raise ValueError(f"Sample ratio must be greater than 1, but got {sample_ratio}")
frame_nums: List[int] = []
for i in range(start_frame, end_frame, sample_ratio):
frame_nums.append(i)
return frame_nums


def convert(time: Union[int, float], in_unit: str, out_unit: str, fps: float) -> Union[int, float]:
try:
in_unit = UNIT_NORMALIZATION[in_unit]
except KeyError:
raise ValueError(f"Not supported time unit: {in_unit}")
try:
out_unit = UNIT_NORMALIZATION[out_unit]
except KeyError:
raise ValueError(f"Not supported time unit: {out_unit}")
# s>s, ms>ms, f>f
if in_unit == out_unit:
return time
elif out_unit == 'frame':
# ms>f
if 'millisecond' == in_unit:
return int(time / 1000 * fps)
# s>f
elif 'second' == in_unit:
return int(time * fps)
# s>ms
elif in_unit == 'second':
return time * 1000
# ms>s
elif in_unit == 'millisecond':
return time // 1000
# f>ms, f>s
else:
return (time / fps) if out_unit == 'second' else (time / fps * 1000) # pytype: disable=bad-return-type

def get_annotation_property(mmif, annotation, prop_name):
# TODO (krim @ 7/18/23): this probably should be merged to the main mmif.serialize packge
if prop_name in annotation:
return annotation.get_property(prop_name)
try:
return mmif[annotation.parent].metadata.contains[annotation.at_type][prop_name]
except KeyError:
raise KeyError(f"Annotation {annotation.id} does not have {prop_name} property.")

def convert_timepoint(mmif: Mmif, timepoint: Annotation, out_unit: str) -> Union[int, float]:
"""
Converts a time point included in an annotation to a different time unit.
The input annotation must have ``timePoint`` property.

:param mmif: input MMIF to obtain fps and input timeunit
:param timepoint: annotation with ``timePoint`` property
:param out_unit: time unit to which the point is converted
:return: frame number (integer) or second/millisecond (float) of input timepoint
"""
in_unit = get_annotation_property(mmif, timepoint, 'timeUnit')
vd = mmif[get_annotation_property(mmif, timepoint, 'document')]
return convert(timepoint.get_property('timePoint'), in_unit, out_unit, get_framerate(vd))

def convert_timeframe(mmif: Mmif, timeframe: Annotation, out_unit: str) -> Union[Tuple[int, int], Tuple[float, float]]:
"""
Converts start and end points in a TimeFrame annotation a different time unit.

:param mmif: input MMIF to obtain fps and input timeunit
:param timeframe: ``TimeFrame` type annotation
:param out_unit: time unit to which the point is converted
:return: tuple of frame numbers (integer) or seconds/milliseconds (float) of input start and end
"""
in_unit = get_annotation_property(mmif, timeframe, 'timeUnit')
vd = mmif[get_annotation_property(mmif, timeframe, 'document')]
return convert(timeframe.get_property('start'), in_unit, out_unit, get_framerate(vd)), \
convert(timeframe.get_property('end'), in_unit, out_unit, get_framerate(vd))



def framenum_to_second(video_doc: Document, frame: int):
fps = get_framerate(video_doc)
return convert(frame, 'f', 's', fps)


def framenum_to_millisecond(video_doc: Document, frame: int):
fps = get_framerate(video_doc)
return convert(frame, 'f', 'ms', fps)


def second_to_framenum(video_doc: Document, second) -> int:
fps = get_framerate(video_doc)
return int(convert(second, 's', 'f', fps))


def millisecond_to_framenum(video_doc: Document, millisecond: float) -> int:
fps = get_framerate(video_doc)
return int(convert(millisecond, 'ms', 'f', fps))
3 changes: 3 additions & 0 deletions requirements.cv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pillow
opencv-python
ffmpeg-python
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ class DevelopCommand(setuptools.command.develop.develop):
with open('requirements.txt') as requirements:
requires = requirements.readlines()

with open('requirements.cv') as requirements:
cv_requires = requirements.readlines()

setuptools.setup(
name=name,
version=version,
Expand All @@ -287,6 +290,7 @@ class DevelopCommand(setuptools.command.develop.develop):
},
install_requires=requires,
extras_require={
'cv': cv_requires,
'dev': [
'pytest',
'pytest-pep8',
Expand Down
62 changes: 62 additions & 0 deletions tests/test_utils_videodocument_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import unittest
import pytest

import numpy as np
from PIL import Image

from mmif import Mmif, Document, AnnotationTypes
from mmif.utils import video_document_helper as vdh


class TestUtilsVideoDocuments(unittest.TestCase):
def setUp(self):
self.fps = 29.97
self.mmif_obj = Mmif(validate=False)
self.a_view = self.mmif_obj.new_view()
self.video_doc = Document({
"@type": "http://mmif.clams.ai/vocabulary/VideoDocument/v1",
"properties": {
"mime": "video",
"id": "d1",
"location": "file:///home/snewman/Documents/test_vid.mp4"
}
})
self.video_doc.add_property('fps', self.fps)
self.mmif_obj.add_document(self.video_doc)

def test_get_framerate(self):
self.assertAlmostEqual(29.97, vdh.get_framerate(self.video_doc), places=0)

def test_frames_to_seconds(self):
self.assertAlmostEqual(3.337, vdh.framenum_to_second(self.video_doc, 100), places=0)

def test_frames_to_milliseconds(self):
self.assertAlmostEqual(3337.0, vdh.framenum_to_millisecond(self.video_doc, 100), places=0)

def test_seconds_to_frames(self):
self.assertAlmostEqual(100, vdh.second_to_framenum(self.video_doc, 3.337), places=0)

def test_milliseconds_to_frames(self):
self.assertAlmostEqual(100, vdh.millisecond_to_framenum(self.video_doc, 3337.0), places=0)

def test_sample_frames(self):
s_frame = vdh.second_to_framenum(self.video_doc, 3)
e_frame = vdh.second_to_framenum(self.video_doc, 5.5)
# note that int(29.97) = 29
self.assertEqual(3, len(vdh.sample_frames(s_frame, e_frame, self.fps)))
s_frame = vdh.second_to_framenum(self.video_doc, 3)
e_frame = vdh.second_to_framenum(self.video_doc, 5)
self.assertEqual(1, len(vdh.sample_frames(s_frame, e_frame, 60)))

def test_convert_timepoint(self):
timepoint_ann = self.a_view.new_annotation(AnnotationTypes.BoundingBox, timePoint=3, timeUnit='second', document='d1')
self.assertEqual(vdh.convert(3, 's', 'f', self.fps), vdh.convert_timepoint(self.mmif_obj, timepoint_ann, 'f'))

def test_convert_timeframe(self):
self.a_view.metadata.new_contain(AnnotationTypes.TimeFrame, timeUnit='frame', document='d1')
timeframe_ann = self.a_view.new_annotation(AnnotationTypes.TimeFrame, start=100, end=200)
for times in zip((3.337, 6.674), vdh.convert_timeframe(self.mmif_obj, timeframe_ann, 's')):
self.assertAlmostEqual(*times, places=0)

if __name__ == '__main__':
unittest.main()