diff --git a/app.py b/app.py index 21e4f9f..3d8f887 100644 --- a/app.py +++ b/app.py @@ -9,6 +9,8 @@ import cache from cache import set_last_access, cleanup +from utils import app, render_ocr, documents_to_htmls, prep_annotations, prepare_ocr_visualization +import traceback import utils from utils import app @@ -27,7 +29,8 @@ def ocr(): ocr_view = mmif.get_view_by_id(data["view_id"]) return utils.prepare_ocr_visualization(mmif, ocr_view, data["mmif_id"]) except Exception as e: - return f'
{e}' + app.logger.error(f"{e}\n{traceback.format_exc()}") + return f'
Error: {e} Check the server log for more information.' @app.route('/ocrpage', methods=['POST']) diff --git a/displacy/__init__.py b/displacy/__init__.py index e1e0dff..0ae0ffe 100644 --- a/displacy/__init__.py +++ b/displacy/__init__.py @@ -83,15 +83,9 @@ def mmif_to_dict(mmif: Mmif): def entity(view: View, annotation: Annotation): - if "targets" in annotation.properties: - start = min([view.annotations[target].properties["start"] for target in annotation.properties["targets"]]) - end = max([view.annotations[target].properties["end"] for target in annotation.properties["targets"]]) - else: - start = annotation.properties['start'] - end = annotation.properties['end'] - return {'start': start, - 'end': end, - 'label': annotation.properties['category']} + return {'start': annotation.get('start'), + 'end': annotation.get('end'), + 'label': annotation.get('category')} def dict_to_html(d): diff --git a/ocr.py b/ocr.py index 00af970..a964296 100644 --- a/ocr.py +++ b/ocr.py @@ -8,6 +8,7 @@ import os, shutil from flask import render_template +from mmif import AnnotationTypes, DocumentTypes, Mmif from mmif.utils.video_document_helper import convert_timepoint, convert_timeframe import cache @@ -35,27 +36,48 @@ def __init__(self, anno, mmif): self.update(anno, mmif) def update(self, anno, mmif): - if anno.at_type.shortname == "BoundingBox": + + if anno.at_type == AnnotationTypes.BoundingBox: self.add_bounding_box(anno, mmif) - elif anno.at_type.shortname == "TimeFrame": + elif anno.at_type == AnnotationTypes.TimeFrame: self.add_timeframe(anno, mmif) - elif anno.at_type.shortname == "TextDocument": - t = anno.properties.get("text_value") or anno.properties.get("text").value - if t: - self.text.append(re.sub(r'([\\\/\|\"\'])', r'\1 ', t)) - - def add_bounding_box(self, anno, mmif): - self.frame_num = convert_timepoint(mmif, anno, "frames") - self.secs = convert_timepoint(mmif, anno, "seconds") - box_id = anno.properties["id"] - boxType = anno.properties["boxType"] - coordinates = anno.properties["coordinates"] + elif anno.at_type == AnnotationTypes.TimePoint: + self.add_timepoint(anno, mmif) + + elif anno.at_type == DocumentTypes.TextDocument: + self.add_text_document(anno) + + elif anno.at_type.shortname == "Paragraph": + view = mmif.get_view_by_id(anno.parent) + text_anno = mmif[anno.properties.get("document")] + self.add_text_document(text_anno) + + def add_bounding_box(self, anno, mmif: Mmif): + timepoint_anno = None + if "timePoint" in anno.properties: + timepoint_anno = mmif[anno.get("timePoint")] + + else: + for alignment_anns in mmif.get_alignments(AnnotationTypes.BoundingBox, AnnotationTypes.TimePoint).values(): + for alignment_ann in alignment_anns: + if alignment_ann.get('source') == anno.id: + timepoint_anno = mmif[alignment_ann.get('target')] + break + elif alignment_ann.get('target') == anno.id: + timepoint_anno = mmif[alignment_ann.get('source')] + break + if timepoint_anno: + self.add_timepoint(timepoint_anno, mmif, skip_if_view_has_frames=False) + + box_id = anno.get("id") + boxType = anno.get("boxType") + coordinates = anno.get("coordinates") x = coordinates[0][0] y = coordinates[0][1] - w = coordinates[3][0] - x - h = coordinates[3][1] - y + w = coordinates[1][0] - x + h = coordinates[1][1] - y box = [box_id, boxType, [x, y, w, h]] self.boxes.append(box) self.anno_ids.append(box_id) @@ -64,40 +86,70 @@ def add_bounding_box(self, anno, mmif): self.boxtypes.append(anno.properties.get("boxType")) def add_timeframe(self, anno, mmif): - start, end = convert_timeframe(mmif, anno, "frames") - start_secs, end_secs = convert_timeframe(mmif, anno, "seconds") + # If annotation has multiple targets, pick the first and last as start and end + if "targets" in anno.properties: + start_id, end_id = anno.properties.get("targets")[0], anno.properties.get("targets")[-1] + anno_parent = mmif.get_view_by_id(anno.parent) + start_anno, end_anno = mmif[start_id], mmif[end_id] + start = convert_timepoint(mmif, start_anno, "frames") + end = convert_timepoint(mmif, end_anno, "frames") + start_secs = convert_timepoint(mmif, start_anno, "seconds") + end_secs = convert_timepoint(mmif, end_anno, "seconds") + else: + start, end = convert_timeframe(mmif, anno, "frames") + start_secs, end_secs = convert_timeframe(mmif, anno, "seconds") self.range = (start, end) self.timestamp_range = (str(datetime.timedelta(seconds=start_secs)), str(datetime.timedelta(seconds=end_secs))) self.sec_range = (start_secs, end_secs) if anno.properties.get("frameType"): - self.frametype = anno.properties.get("frameType") - - -def find_annotation(anno_id, view, mmif): - if mmif.id_delimiter in anno_id: - view_id, anno_id = anno_id.split(mmif.id_delimiter) - view = mmif.get_view_by_id(view_id) - return view.get_annotation_by_id(anno_id) - - -def get_ocr_frames(view, mmif, fps): + self.frametype = str(anno.properties.get("frameType")) + elif anno.properties.get("label"): + self.frametype = str(anno.properties.get("label")) + + def add_timepoint(self, anno, mmif, skip_if_view_has_frames=True): + parent = mmif.get_view_by_id(anno.parent) + other_annotations = [k for k in parent.metadata.contains.keys() if k != anno.id] + # If there are TimeFrames in the same view, they most likely represent + # condensed information about representative frames (e.g. SWT). In this + # case, only render the TimeFrames and ignore the TimePoints. + if any([anno == AnnotationTypes.TimeFrame for anno in other_annotations]) and skip_if_view_has_frames: + return + self.frame_num = convert_timepoint(mmif, anno, "frames") + self.secs = convert_timepoint(mmif, anno, "seconds") + self.timestamp = str(datetime.timedelta(seconds=self.secs)) + if anno.properties.get("label"): + self.frametype = anno.properties.get("label") + + def add_text_document(self, anno): + t = anno.properties.get("text_value") or anno.properties.get("text").value + if t: + text_val = re.sub(r'([\\\/\|\"\'])', r'\1 ', t) + self.text = self.text + [text_val] if text_val not in self.text else self.text + + +def get_ocr_frames(view, mmif): frames = {} full_alignment_type = [ - at_type for at_type in view.metadata.contains if at_type.shortname == "Alignment"] + at_type for at_type in view.metadata.contains if at_type == AnnotationTypes.Alignment] # If view contains alignments if full_alignment_type: for alignment in view.get_annotations(full_alignment_type[0]): - source = find_annotation(alignment.properties["source"], view, mmif) - target = find_annotation(alignment.properties["target"], view, mmif) + source = mmif[alignment.get("source")] + target = mmif[alignment.get("target")] + # Account for alignment in either direction frame = OCRFrame(source, mmif) + frame.update(target, mmif) + i = frame.frame_num if frame.frame_num is not None else frame.range + if i is None: + continue if i in frames.keys(): frames[i].update(source, mmif) frames[i].update(target, mmif) else: - frame.update(target, mmif) frames[i] = frame + else: for annotation in view.get_annotations(): frame = OCRFrame(annotation, mmif) @@ -108,6 +160,7 @@ def get_ocr_frames(view, mmif, fps): frames[i].update(annotation, mmif) else: frames[i] = frame + print(frames) return frames @@ -175,7 +228,7 @@ def make_image_directory(mmif_id): return path -def find_duplicates(frames_list, cv2_vid): +def find_duplicates(frames_list): """Find duplicate frames""" prev_frame = None for frame_num, frame in frames_list: @@ -239,18 +292,23 @@ def round_boxes(boxes): def get_ocr_views(mmif): """Returns all CV views, which contain timeframes or bounding boxes""" views = [] - required_types = ["TimeFrame", "BoundingBox"] + required_types = ["TimeFrame", "BoundingBox", "TimePoint"] for view in mmif.views: for anno_type, anno in view.metadata.contains.items(): # Annotation belongs to a CV view if it is a TimeFrame/BB and it refers to a VideoDocument - if anno_type.shortname in required_types and mmif.get_document_by_id( - anno["document"]).at_type.shortname == "VideoDocument": + # if anno.get("document") is None: + # continue + # if anno_type.shortname in required_types and mmif.get_document_by_id( + # anno["document"]).at_type.shortname == "VideoDocument": + # views.append(view) + # continue + if anno_type.shortname in required_types: views.append(view) - continue + break # TODO: Couldn't find a simple way to show if an alignment view is a CV/Frames-type view elif "parseq" in view.metadata.app: views.append(view) - continue + break return views diff --git a/utils.py b/utils.py index 1c10e5c..63965ac 100644 --- a/utils.py +++ b/utils.py @@ -60,15 +60,17 @@ def asr_alignments_to_vtt(alignment_view, viz_id): def build_alignment(alignment, token_idx, timeframe_idx): - target = alignment.properties['target'] - source = alignment.properties['source'] + target = alignment.get('target') + source = alignment.get('source') timeframe = timeframe_idx.get(source) token = token_idx.get(target) if timeframe and token: - start = timeframe.properties['start'] - end = timeframe.properties['end'] - text = token.properties['word'] - return start, end, text + start = timeframe.get('start') + end = timeframe.get('end') + for text_key in ['text', 'word']: + if text_key in token: + text = token.get(text_key) + return start, end, text def get_src_media_symlink_basename(doc: Document): @@ -147,12 +149,12 @@ def get_boxes(mmif): # Javascript code that draws the rectangle. boxes = [] for a in tbox_annotations: - coordinates = a.properties["coordinates"] + coordinates = a.get("coordinates") x = coordinates[0][0] y = coordinates[0][1] w = coordinates[1][0] - x h = coordinates[2][1] - y - box = [a.properties["id"], a.properties["boxType"], [x, y, w, h]] + box = [a.get("id"), a.get("boxType"), [x, y, w, h]] boxes.append(box) return boxes @@ -245,7 +247,7 @@ def get_document_ids(view, annotation_type): for annotation in view.annotations: if annotation.at_type.shortname == str(annotation_type): try: - ids.add(annotation.properties["document"]) + ids.add(annotation.get("document")) except KeyError: pass return list(ids) @@ -379,14 +381,12 @@ def prepare_ocr_visualization(mmif, view, mmif_id): """ Visualize OCR by extracting image frames with BoundingBoxes from video""" # frames, text_docs, alignments = {}, {}, {} vid_path = mmif.get_documents_by_type(DocumentTypes.VideoDocument)[0].location_path() - cv2_vid = cv2.VideoCapture(vid_path) - fps = cv2_vid.get(cv2.CAP_PROP_FPS) - ocr_frames = get_ocr_frames(view, mmif, fps) + ocr_frames = get_ocr_frames(view, mmif) # Generate pages (necessary to reduce IO cost) and render frames_list = [(k, vars(v)) for k, v in ocr_frames.items()] - frames_list = find_duplicates(frames_list, cv2_vid) + frames_list = find_duplicates(frames_list) frames_pages = paginate(frames_list) # Save page list as temp file save_json(frames_pages, view.id, mmif_id)