diff --git a/Makefile b/Makefile index 2b7d09aa..4b384011 100644 --- a/Makefile +++ b/Makefile @@ -62,6 +62,7 @@ $(artifact): test: devversion $(generatedcode) pip install --upgrade -r requirements.dev pip install -r requirements.txt + pip install -r requirements.cv pytype $(packagename) python3 -m pytest --doctest-modules --cov=$(packagename) --cov-report=xml diff --git a/mmif/serialize/annotation.py b/mmif/serialize/annotation.py index 1c002da6..c40f4376 100644 --- a/mmif/serialize/annotation.py +++ b/mmif/serialize/annotation.py @@ -135,6 +135,13 @@ def get(self, prop_name: str) -> Union['AnnotationProperties', JSON_PRMTV_TYPES, def __getitem__(self, prop_name: str): return self.get(prop_name) + def __contains__(self, item): + try: + self.get(item) + return True + except KeyError: + return False + def is_document(self): return isinstance(self.at_type, DocumentTypesBase) diff --git a/mmif/utils/__init__.py b/mmif/utils/__init__.py new file mode 100644 index 00000000..5605a32d --- /dev/null +++ b/mmif/utils/__init__.py @@ -0,0 +1 @@ +from mmif.utils import video_document_helper diff --git a/mmif/utils/video_document_helper.py b/mmif/utils/video_document_helper.py new file mode 100644 index 00000000..1cfa9211 --- /dev/null +++ b/mmif/utils/video_document_helper.py @@ -0,0 +1,190 @@ +from typing import List, Union, Tuple + +import numpy as np +from PIL import Image + +from mmif import Annotation, Document, Mmif +from mmif.vocabulary import DocumentTypes + +try: + import cv2 + import ffmpeg + import PIL +except ImportError as e: + raise ImportError( + f"Optional package {e.name} not found. You might want to install Computer-Vision dependencies by running `pip install mmif-python[cv]`") + +FPS_DOCPROP_KEY = 'fps' +UNIT_NORMALIZATION = { + 'ms': 'millisecond', + 'msec': 'millisecond', + 'millisecond': 'millisecond', + 'milliseconds': 'millisecond', + 's': 'second', + 'sec': 'second', + 'second': 'second', + 'seconds': 'second', + 'frame': 'frame', + 'f': 'frame', +} + + +def capture(vd: Document) -> cv2.VideoCapture: + if vd is None or vd.at_type != DocumentTypes.VideoDocument: + raise ValueError(f'The document does not exist.') + + v = cv2.VideoCapture(vd.location_path()) + vd.add_property(FPS_DOCPROP_KEY, v.get(cv2.CAP_PROP_FPS)) + return v + + +def get_framerate(vd: Document) -> float: + if vd is None or vd.at_type != DocumentTypes.VideoDocument: + raise ValueError(f'The document does not exist.') + + framerate_keys = (FPS_DOCPROP_KEY, 'framerate') + for k in framerate_keys: + if k in vd: + fps = vd.get_property(k) + return fps + capture(vd) + return vd.get_property(FPS_DOCPROP_KEY) + + +def extract_frames_as_images(vd: Document, framenums: List[int], as_PIL: bool = False) -> List[Union[np.ndarray, PIL.Image.Image]]: + """ + Extracts frames from a video document as a list of numpy arrays. + Use `sample_frames` function in this module to get the list of frame numbers first. + + :param vd: VideoDocument object that holds the video file location + :param framenums: integers representing the frame numbers to extract + :param as_PIL: use PIL.Image instead of numpy.ndarray + :return: frames as a list of numpy arrays or PIL.Image objects + """ + frames = [] + video = capture(vd) + for framenum in framenums: + video.set(cv2.CAP_PROP_POS_FRAMES, framenum) + ret, frame = video.read() + if ret: + frames.append(Image.fromarray(frame[:, :, ::-1]) if as_PIL else frame) + else: + break + return frames + + +def extract_mid_frame(mmif: Mmif, tf: Annotation, as_PIL: bool = False) -> Union[np.ndarray, PIL.Image.Image]: + """ + Extracts the middle frame from a video document + """ + timeunit = get_annotation_property(mmif, tf, 'timeUnit') + vd = mmif[get_annotation_property(mmif, tf, 'document')] + fps = get_framerate(vd) + midframe = sum(convert(float(tf.get_property(timepoint_propkey)), timeunit, 'frame', fps) for timepoint_propkey in ('start', 'end')) // 2 + return extract_frames_as_images(vd, [midframe], as_PIL=as_PIL)[0] + + +def sample_frames(start_frame: int, end_frame: int, sample_ratio: int = 1) -> List[int]: + """ + Helper function to sample frames from a time interval. + When start_frame is 0 and end_frame is X, this function basically works as "cutoff". + + :param start_frame: start frame of the interval + :param end_frame: end frame of the interval + :param sample_ratio: sample ratio or sample step, default is 1, meaning all consecutive frames are sampled + """ + sample_ratio = int(sample_ratio) + if sample_ratio < 1: + raise ValueError(f"Sample ratio must be greater than 1, but got {sample_ratio}") + frame_nums: List[int] = [] + for i in range(start_frame, end_frame, sample_ratio): + frame_nums.append(i) + return frame_nums + + +def convert(time: Union[int, float], in_unit: str, out_unit: str, fps: float) -> Union[int, float]: + try: + in_unit = UNIT_NORMALIZATION[in_unit] + except KeyError: + raise ValueError(f"Not supported time unit: {in_unit}") + try: + out_unit = UNIT_NORMALIZATION[out_unit] + except KeyError: + raise ValueError(f"Not supported time unit: {out_unit}") + # s>s, ms>ms, f>f + if in_unit == out_unit: + return time + elif out_unit == 'frame': + # ms>f + if 'millisecond' == in_unit: + return int(time / 1000 * fps) + # s>f + elif 'second' == in_unit: + return int(time * fps) + # s>ms + elif in_unit == 'second': + return time * 1000 + # ms>s + elif in_unit == 'millisecond': + return time // 1000 + # f>ms, f>s + else: + return (time / fps) if out_unit == 'second' else (time / fps * 1000) # pytype: disable=bad-return-type + +def get_annotation_property(mmif, annotation, prop_name): + # TODO (krim @ 7/18/23): this probably should be merged to the main mmif.serialize packge + if prop_name in annotation: + return annotation.get_property(prop_name) + try: + return mmif[annotation.parent].metadata.contains[annotation.at_type][prop_name] + except KeyError: + raise KeyError(f"Annotation {annotation.id} does not have {prop_name} property.") + +def convert_timepoint(mmif: Mmif, timepoint: Annotation, out_unit: str) -> Union[int, float]: + """ + Converts a time point included in an annotation to a different time unit. + The input annotation must have ``timePoint`` property. + + :param mmif: input MMIF to obtain fps and input timeunit + :param timepoint: annotation with ``timePoint`` property + :param out_unit: time unit to which the point is converted + :return: frame number (integer) or second/millisecond (float) of input timepoint + """ + in_unit = get_annotation_property(mmif, timepoint, 'timeUnit') + vd = mmif[get_annotation_property(mmif, timepoint, 'document')] + return convert(timepoint.get_property('timePoint'), in_unit, out_unit, get_framerate(vd)) + +def convert_timeframe(mmif: Mmif, timeframe: Annotation, out_unit: str) -> Union[Tuple[int, int], Tuple[float, float]]: + """ + Converts start and end points in a TimeFrame annotation a different time unit. + + :param mmif: input MMIF to obtain fps and input timeunit + :param timeframe: ``TimeFrame` type annotation + :param out_unit: time unit to which the point is converted + :return: tuple of frame numbers (integer) or seconds/milliseconds (float) of input start and end + """ + in_unit = get_annotation_property(mmif, timeframe, 'timeUnit') + vd = mmif[get_annotation_property(mmif, timeframe, 'document')] + return convert(timeframe.get_property('start'), in_unit, out_unit, get_framerate(vd)), \ + convert(timeframe.get_property('end'), in_unit, out_unit, get_framerate(vd)) + + + +def framenum_to_second(video_doc: Document, frame: int): + fps = get_framerate(video_doc) + return convert(frame, 'f', 's', fps) + + +def framenum_to_millisecond(video_doc: Document, frame: int): + fps = get_framerate(video_doc) + return convert(frame, 'f', 'ms', fps) + + +def second_to_framenum(video_doc: Document, second) -> int: + fps = get_framerate(video_doc) + return int(convert(second, 's', 'f', fps)) + + +def millisecond_to_framenum(video_doc: Document, millisecond: float) -> int: + fps = get_framerate(video_doc) + return int(convert(millisecond, 'ms', 'f', fps)) diff --git a/requirements.cv b/requirements.cv new file mode 100644 index 00000000..47ef0a21 --- /dev/null +++ b/requirements.cv @@ -0,0 +1,3 @@ +pillow +opencv-python +ffmpeg-python diff --git a/setup.py b/setup.py index bbad1999..21d7d136 100644 --- a/setup.py +++ b/setup.py @@ -269,6 +269,9 @@ class DevelopCommand(setuptools.command.develop.develop): with open('requirements.txt') as requirements: requires = requirements.readlines() +with open('requirements.cv') as requirements: + cv_requires = requirements.readlines() + setuptools.setup( name=name, version=version, @@ -287,6 +290,7 @@ class DevelopCommand(setuptools.command.develop.develop): }, install_requires=requires, extras_require={ + 'cv': cv_requires, 'dev': [ 'pytest', 'pytest-pep8', diff --git a/tests/test_utils_videodocument_helper.py b/tests/test_utils_videodocument_helper.py new file mode 100644 index 00000000..2f2e8d9e --- /dev/null +++ b/tests/test_utils_videodocument_helper.py @@ -0,0 +1,62 @@ +import unittest +import pytest + +import numpy as np +from PIL import Image + +from mmif import Mmif, Document, AnnotationTypes +from mmif.utils import video_document_helper as vdh + + +class TestUtilsVideoDocuments(unittest.TestCase): + def setUp(self): + self.fps = 29.97 + self.mmif_obj = Mmif(validate=False) + self.a_view = self.mmif_obj.new_view() + self.video_doc = Document({ + "@type": "http://mmif.clams.ai/vocabulary/VideoDocument/v1", + "properties": { + "mime": "video", + "id": "d1", + "location": "file:///home/snewman/Documents/test_vid.mp4" + } + }) + self.video_doc.add_property('fps', self.fps) + self.mmif_obj.add_document(self.video_doc) + + def test_get_framerate(self): + self.assertAlmostEqual(29.97, vdh.get_framerate(self.video_doc), places=0) + + def test_frames_to_seconds(self): + self.assertAlmostEqual(3.337, vdh.framenum_to_second(self.video_doc, 100), places=0) + + def test_frames_to_milliseconds(self): + self.assertAlmostEqual(3337.0, vdh.framenum_to_millisecond(self.video_doc, 100), places=0) + + def test_seconds_to_frames(self): + self.assertAlmostEqual(100, vdh.second_to_framenum(self.video_doc, 3.337), places=0) + + def test_milliseconds_to_frames(self): + self.assertAlmostEqual(100, vdh.millisecond_to_framenum(self.video_doc, 3337.0), places=0) + + def test_sample_frames(self): + s_frame = vdh.second_to_framenum(self.video_doc, 3) + e_frame = vdh.second_to_framenum(self.video_doc, 5.5) + # note that int(29.97) = 29 + self.assertEqual(3, len(vdh.sample_frames(s_frame, e_frame, self.fps))) + s_frame = vdh.second_to_framenum(self.video_doc, 3) + e_frame = vdh.second_to_framenum(self.video_doc, 5) + self.assertEqual(1, len(vdh.sample_frames(s_frame, e_frame, 60))) + + def test_convert_timepoint(self): + timepoint_ann = self.a_view.new_annotation(AnnotationTypes.BoundingBox, timePoint=3, timeUnit='second', document='d1') + self.assertEqual(vdh.convert(3, 's', 'f', self.fps), vdh.convert_timepoint(self.mmif_obj, timepoint_ann, 'f')) + + def test_convert_timeframe(self): + self.a_view.metadata.new_contain(AnnotationTypes.TimeFrame, timeUnit='frame', document='d1') + timeframe_ann = self.a_view.new_annotation(AnnotationTypes.TimeFrame, start=100, end=200) + for times in zip((3.337, 6.674), vdh.convert_timeframe(self.mmif_obj, timeframe_ann, 's')): + self.assertAlmostEqual(*times, places=0) + +if __name__ == '__main__': + unittest.main()