Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype for storage API #7

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions prototype/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
storage_dir: C:\Users\eljor\desktop\fast_playground\
175 changes: 175 additions & 0 deletions prototype/storage_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
from mmif import Mmif
from clams import mmif_utils
from flask import Flask, request, jsonify, send_from_directory
from enum import Enum
from pydantic import BaseModel
from typing import List, Dict
from typing_extensions import Annotated
import os
import yaml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this yaml is used only for a limited purpose, namely to load config file. However, the existing code is based on .env file and dotenv module, so let's migrate to environment variable-based configuration to match the existing code.

import hashlib
import json


app = Flask(__name__)
# get post request from user
# read mmif inside post request, get view metadata
# store in nested directory relating to view metadata

# TODO: this app accepts "unresolvable" as an app version number; it needs to be fixed because
# TODO: "unresolvable" is not specific and can represent multiple versions.


@app.route("/")
def root():
return {"message": "Storage api for pipelined mmif files"}


@app.route("/upload_mmif/", methods=["POST"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's think about the routings here. Existing "baapb resolver" codebase (api/__init__.py) uses many routings, but only one of them (/searchapi/) is a pure API landing point (others are attached to some web page with minimal front end implementation).

Eventually, we want to run a single server app that can handle "assets" (video files) as well as can handle "mmif", meaning this storage_api.py should be merged into the api/ pacakge. To that end, I think we need some basic rules for naming these routing paths.

Since /searchapi is already used in production server (for baapb:// resolution), how about

  • /upload_mmif >> /storeapi/mmif
  • /retrieve >> /searchapi/mmif
    ?

Note that changes in the routing names will also impact the client-side implementation (that you're currently working on).

def upload_mmif():
body = request.get_data(as_text=True)
# read local storage directory from config.yml
with open('config.yml', 'r') as file:
config = yaml.safe_load(file)
directory = config['storage_dir']
mmif = Mmif(body)
# get guid from location
# document = body.[0]['properties']['location'].split('/')[2].split('.')[0]
document = mmif.documents['d1']['properties'].location.split('/')[2].split('.')[0]
# append '.mmif' to guid
document = document + '.mmif'
# IMPORTANT: In order to enable directory creation after this loop and also store each parameter
# dictionary in its proper directory, I create a dictionary to associate the current path level with
# its param dict. After this loop, I create the dirs and then iterate through this dictionary to
# place the param dicts in their proper spots.
param_path_dict = {}
for view in mmif.views:
# this should return the back half of the app url, so just app name and version number
subdir_list = view.metadata.app.split('/')[3:]
# create path string for this view
view_path = os.path.join('', *subdir_list)
# now we want to convert the parameter dictionary to a string and then hash it.
# this hash will be the name of another subdirectory.
try:
param_dict = view.metadata["parameters"]
param_list = ['='.join(pair) for pair in param_dict.items()]
param_list.sort()
param_string = ','.join(param_list)
except KeyError:
param_dict = ""
param_string = ""
# hash the (sorted and concatenated list of params) string and join with path
# NOTE: this is *not* for security purposes, so the usage of md5 is not an issue.
param_hash = hashlib.md5(param_string.encode('utf-8')).hexdigest()
view_path = os.path.join(view_path, param_hash)
# check if this is a duplicate view. if it is, skip the current view.
# NOTE: duplicate views are those with the same app, version number, AND parameter dict.
if view_path in directory:
continue
# create path by joining directory with the current view path
directory = os.path.join(directory, view_path)
# now that we know it's not a duplicate view and we have the proper path location, we
# store it and the associated param dict inside param_path_dict.
param_path_dict[directory] = param_dict
# we have finished looping through the views. now time to create the directories
# and dump the param dicts
os.makedirs(directory, exist_ok=True)
for path in param_path_dict:
file_path = os.path.join(path, 'parameters.json')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this out, and figured that it would be more convenient for clients if we just store .mmif files only in this directory, and have an additional api to return the entire absolute path of the directory (just like how baapb:// is resolved), so that when we run some code that uses pre-stored mmif files on the same machine where those files are, the client doesn't have to "download" the mmif files, but directly can access the directory (if exists) and use the files under.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if we store the parameter files in the "appversion" directory, and keeping only mmif files in the "hash" directory, clients can just take * glob to read all mmif files, without worrying about any additional metadata-like files.
So instead of

storage-root/
├── app1
│   ├── v1
│   │   ├── hash1
│   │   │   ├── guid1.mmif
│   │   │   └── params.json
│   │   ├── hash2
│   │   │   ├── guid1.mmif
│   │   │   └── params.json
│   │   └── hash3
│   │       ├── guid1.mmif
│   │       └── params.json
│   └── v2
│       └── hash1
│           ├── guid1.mmif
│           └── params.json
└── app2
    ├── v1
    │   └── hash1
    │       ├── guid1.mmif
    │       └── params.json
    └── v2
        └── hash1
            ├── guid1.mmif
            └── params.json

,

storage-root/
├── app1
│   ├── v1
│   │   ├── hash1
│   │   │   └── guid1.mmif
│   │   ├── hash1.json
│   │   ├── hash2
│   │   │   └── guid1.mmif
│   │   ├── hash2.json
│   │   ├── hash3
│   │   │   └── guid1.mmif
│   │   └── hash3.json
│   └── v2
│       ├── hash1
│       │   └── guid1.mmif
│       └── hash1.json
└── app2
    ├── v1
    │   ├── hash1
    │   │   └── guid1.mmif
    │   └── hash1.json
    └── v2
        ├── hash1
        │   └── guid1.mmif
        └── hash1.json

with open(file_path, "w") as f:
json.dump(param_path_dict[path], f)
# put mmif into the lowest level directory with filename based on guid
file_path = os.path.join(directory, document)
with open(file_path, "w") as f:
f.write(mmif.serialize())
return "Success", 201


@app.route("/retrieve/", methods=["POST"])
def download_mmif():
# if not request.is_json:
# return {'error': 'Request must be JSON'}, 400
data = json.loads(request.data.decode('utf-8'))
# get both pipeline and guid from data
# obtain pipeline using helper method
pipeline = pipeline_from_param_json(data)
# get number of views for rewind if necessary
num_views = len(data['pipeline'])
guid = data.get('guid')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mentioned you want to work on (or already on it?) multi-guid query scenario. As mentioned in other comment, how about also adding zero-guid query, to return just the full directory path?

# validate existence of both args
if not pipeline or not guid:
return jsonify({'error': 'Missing required parameters: need both pipeline & guid'})
# concat pipeline with local storage
with open('config.yml', 'r') as file:
config = yaml.safe_load(file)
storage = config['storage_dir']
pipeline = os.path.join(storage, pipeline)
guid = guid + ".mmif"
# get file from storage directory
path = os.path.join(pipeline, guid)
# if file exists, we can return it
try:
with open(path, 'r') as file:
mmif = file.read()
return mmif
# otherwise we will use the rewinder
# this assumes the user has provided a subset of a mmif pipeline that we have previously stored
# in the case where this is not true, we return a FileNotFound error.
except FileNotFoundError:
return rewind_time(pipeline, guid, num_views)


# helper method for extracting pipeline
def pipeline_from_param_json(param_json):
"""
This method reads in a json containing the names of the pipelined apps and their
respective parameters, and then builds a path out of the pipelined apps and hashed
parameters.
"""
pipeline = ""
for clams_app in param_json["pipeline"]:
# not using os path join until later for testing purposes
pipeline = pipeline + "/" + clams_app
# try to get param items
try:
param_list = ['='.join(pair) for pair in param_json["pipeline"][clams_app].items()]
param_list.sort()
param_string = ','.join(param_list)
# throws attribute error if empty (because empty means it's a set and not dict)
except AttributeError:
param_string = ""
# hash parameters
param_hash = hashlib.md5(param_string.encode('utf-8')).hexdigest()
pipeline = pipeline + "/" + param_hash
# removing first "/" so it doesn't mess with os.path.join later
pipeline = pipeline[1:]
return pipeline


def rewind_time(pipeline, guid, num_views):
"""
This method takes in a pipeline (path), a guid, and a number of views, and uses os.walk to iterate through
directories that begin with that pipeline. It takes the first mmif file that matches the guid and uses the
rewind feature to include only the views indicated by the pipeline.
"""
for home, dirs, files in os.walk(pipeline):
# find mmif with matching guid to rewind
for file in files:
if guid == file:
# rewind the mmif
with open(os.path.join(home, file), 'r') as f:
mmif = Mmif(f.read())
# we need to calculate the number of views to rewind
rewound = mmif_utils.rewind.rewind_mmif(mmif, len(mmif.views) - num_views)
return rewound.serialize()
raise FileNotFoundError




if __name__ == "__main__":
app.run(port=8912)




22 changes: 22 additions & 0 deletions prototype/test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"pipeline": {
"swt-detection/unresolvable": {
},

"simple-timepoints-stitcher/v1.3": {
"labelMap": "['B:bars', 'S:slate', 'S-H:slate', 'S-C:slate', 'S-D:slate', 'S-G:slate', 'I:chyron', 'N:chyron', 'Y:chyron', 'C:credits']"
},

"doctr-wrapper/unresolvable": {
"tfLabel": "chyron",
"pretty": ""
},

"doctr-wrapper": {
"tfLabel": "['credits', 'credit']",
"pretty": ""
}

},
"guid": "cpb-aacip-0515ac167c0"
}
12 changes: 12 additions & 0 deletions prototype/test_rewind.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"pipeline": {
"swt-detection/unresolvable": {
},

"simple-timepoints-stitcher/v1.3": {
"labelMap": "['B:bars', 'S:slate', 'S-H:slate', 'S-C:slate', 'S-D:slate', 'S-G:slate', 'I:chyron', 'N:chyron', 'Y:chyron', 'C:credits']"
}

},
"guid": "cpb-aacip-0515ac167c0"
}