|
| 1 | +import json |
| 2 | +import warnings |
| 3 | +from typing import Optional |
| 4 | + |
1 | 5 | import flask
|
2 | 6 | from flask import Blueprint, current_app, jsonify
|
3 |
| - |
4 | 7 | from flask_parameter_validation import ValidateParameters
|
| 8 | +from flask_parameter_validation.exceptions.exceptions import ConfigurationError |
| 9 | +import re |
5 | 10 |
|
6 | 11 | docs_blueprint = Blueprint(
|
7 | 12 | "docs", __name__, url_prefix="/docs", template_folder="./templates"
|
@@ -38,6 +43,8 @@ def get_function_docs(func):
|
38 | 43 | "docstring": format_docstring(fdocs.get("docstring")),
|
39 | 44 | "decorators": fdocs.get("decorators"),
|
40 | 45 | "args": extract_argument_details(fdocs),
|
| 46 | + "deprecated": fdocs.get("deprecated"), |
| 47 | + "responses": fdocs.get("openapi_responses"), |
41 | 48 | }
|
42 | 49 | return None
|
43 | 50 |
|
@@ -141,3 +148,188 @@ def docs_json():
|
141 | 148 | "default_theme": config.get("FPV_DOCS_DEFAULT_THEME", "light"),
|
142 | 149 | }
|
143 | 150 | )
|
| 151 | + |
| 152 | + |
| 153 | +def fpv_error(message): |
| 154 | + return jsonify({"error": message}) |
| 155 | + |
| 156 | + |
| 157 | +def parameter_required(param): |
| 158 | + if param["type"].startswith("Optional["): |
| 159 | + return False |
| 160 | + elif "default" in param["loc_args"]: |
| 161 | + return False |
| 162 | + return True |
| 163 | + |
| 164 | +def generate_json_schema_helper(param, param_type, parent_group=None): |
| 165 | + match = re.match(r'(\w+)\[([\w\[\] ,.]+)]', param_type) |
| 166 | + if match: |
| 167 | + type_group = match.group(1) |
| 168 | + type_params = match.group(2) |
| 169 | + return generate_json_schema_helper(param, type_params, parent_group=type_group) |
| 170 | + elif "|" in param_type and "[" not in param_type: # Handle Union shorthand as Union |
| 171 | + return generate_json_schema_helper(param, f"Union[{param_type.replace('|', ',')}]", parent_group=parent_group) |
| 172 | + else: |
| 173 | + schemas = [] |
| 174 | + param_types = [param_type] |
| 175 | + if parent_group in ["Union", "Optional"]: |
| 176 | + if "," in param_type: |
| 177 | + param_types = [p.strip() for p in param_type.split(",")] |
| 178 | + for p in param_types: |
| 179 | + print(f"{param['name']}: {p}") |
| 180 | + subschema = {} |
| 181 | + if p == "str": |
| 182 | + subschema["type"] = "string" |
| 183 | + if "min_str_length" in param["loc_args"]: |
| 184 | + subschema["minLength"] = param["loc_args"]["min_str_length"] |
| 185 | + if "max_str_length" in param["loc_args"]: |
| 186 | + subschema["maxLength"] = param["loc_args"]["max_str_length"] |
| 187 | + # TODO: Is it possible to make this work with whitelist, blacklist and pattern simultaneously? |
| 188 | + elif p == "int": |
| 189 | + subschema["type"] = "integer" |
| 190 | + if "min_int" in param["loc_args"]: |
| 191 | + subschema["minimum"] = param["loc_args"]["min_int"] |
| 192 | + if "max_int" in param["loc_args"]: |
| 193 | + subschema["maximum"] = param["loc_args"]["max_int"] |
| 194 | + elif p == "bool": |
| 195 | + subschema["type"] = "boolean" |
| 196 | + elif p == "float": |
| 197 | + subschema["type"] = "number" |
| 198 | + elif p in ["datetime", "datetime.datetime"]: |
| 199 | + subschema["type"] = "string" |
| 200 | + subschema["format"] = "date-time" |
| 201 | + if "datetime_format" in param["loc_args"]: |
| 202 | + warnings.warn("datetime_format cannot be translated to JSON Schema, please use ISO8601 date-time", |
| 203 | + Warning, stacklevel=2) |
| 204 | + elif p in ["date", "datetime.date"]: |
| 205 | + subschema["type"] = "string" |
| 206 | + subschema["format"] = "date" |
| 207 | + elif p in ["time", "datetime.time"]: |
| 208 | + subschema["type"] = "string" |
| 209 | + subschema["format"] = "time" |
| 210 | + elif p == "dict": |
| 211 | + subschema["type"] = "object" |
| 212 | + elif p in ["None", "NoneType"]: |
| 213 | + subschema["type"] = "null" |
| 214 | + else: |
| 215 | + print(f"Unexpected type: {p}") |
| 216 | + schemas.append(subschema) |
| 217 | + if len(schemas) == 1 and parent_group is None: |
| 218 | + return schemas[0] |
| 219 | + elif parent_group in ["Optional", "Union"]: |
| 220 | + return {"oneOf": schemas} |
| 221 | + elif parent_group == "List": |
| 222 | + schema = {"type": "array", "items": schemas[0]} |
| 223 | + if "min_list_length" in param["loc_args"]: |
| 224 | + schema["minItems"] = param["loc_args"]["min_list_length"] |
| 225 | + if "max_list_length" in param["loc_args"]: |
| 226 | + schema["maxItems"] = param["loc_args"]["max_list_length"] |
| 227 | + return schema |
| 228 | + else: |
| 229 | + print(f"Unexpected situation: {param_type}, {parent_group}") |
| 230 | + |
| 231 | + |
| 232 | +def generate_json_schema_for_parameter(param): |
| 233 | + return generate_json_schema_helper(param, param["type"]) |
| 234 | + |
| 235 | + |
| 236 | +def generate_json_schema_for_parameters(params): |
| 237 | + schema = { |
| 238 | + "type": "object", |
| 239 | + "properties": {}, |
| 240 | + "required": [] |
| 241 | + } |
| 242 | + for p in params: |
| 243 | + schema_parameter_name = p["name"] if "alias" not in p["loc_args"] else p["loc_args"]["alias"] |
| 244 | + if "json_schema" in p["loc_args"]: |
| 245 | + schema["properties"][schema_parameter_name] = p["loc_args"]["json_schema"] |
| 246 | + else: |
| 247 | + schema["properties"][schema_parameter_name] = generate_json_schema_for_parameter(p) |
| 248 | + if parameter_required(p): |
| 249 | + schema["required"].append(schema_parameter_name) |
| 250 | + return schema |
| 251 | + |
| 252 | +def generate_openapi_paths_object(): |
| 253 | + oapi_paths = {} |
| 254 | + for route in get_route_docs(): |
| 255 | + oapi_path_route = re.sub(r'<(\w+):(\w+)>', r'{\2}', route['rule']) |
| 256 | + oapi_path_route = re.sub(r'<(\w+)>', r'{\1}', oapi_path_route) |
| 257 | + print(f"Adding {route['rule']} to paths as {oapi_path_route}") |
| 258 | + oapi_path_item = {} |
| 259 | + oapi_operation = {} # tags, summary, description, externalDocs, operationId, parameters, requestBody, responses, callbacks, deprecated, security, servers |
| 260 | + oapi_parameters = [] |
| 261 | + oapi_request_body = {"content": {}} |
| 262 | + for arg_loc in route["args"]: |
| 263 | + if arg_loc == "Form": |
| 264 | + oapi_request_body["content"]["application/x-www-form-urlencoded"] = { |
| 265 | + "schema": generate_json_schema_for_parameters(route["args"][arg_loc])} |
| 266 | + elif arg_loc == "Json": |
| 267 | + oapi_request_body["content"]["application/json"] = { |
| 268 | + "schema": generate_json_schema_for_parameters(route["args"][arg_loc])} |
| 269 | + elif arg_loc == "File": # See https://github.com/OAI/OpenAPI-Specification/blob/main/versions/3.1.0.md#considerations-for-file-uploads |
| 270 | + for arg in route["args"][arg_loc]: |
| 271 | + if "content_types" in arg["loc_args"]: |
| 272 | + for content_type in arg["loc_args"]["content_types"]: |
| 273 | + oapi_request_body["content"][content_type] = {} |
| 274 | + else: |
| 275 | + oapi_request_body["content"]["application/octet-stream"] = {} |
| 276 | + elif arg_loc in ["Route", "Query"]: |
| 277 | + for arg in route["args"][arg_loc]: |
| 278 | + if "alias" in arg["loc_args"]: |
| 279 | + oapi_path_route = oapi_path_route.replace(f'{{{arg["name"]}}}', |
| 280 | + f'{{{arg["loc_args"]["alias"]}}}') |
| 281 | + schema_arg_name = arg["name"] if "alias" not in arg["loc_args"] else arg["loc_args"]["alias"] |
| 282 | + if arg_loc == "Query" or (arg_loc == "Route" and f"{{{schema_arg_name}}}" in oapi_path_route): |
| 283 | + parameter = { |
| 284 | + "name": schema_arg_name, |
| 285 | + "in": "path" if arg_loc == "Route" else "query", |
| 286 | + "required": True if arg_loc == "Route" else parameter_required(arg), |
| 287 | + "schema": arg["loc_args"]["json_schema"] if "json_schema" in arg[ |
| 288 | + "loc_args"] else generate_json_schema_for_parameter(arg), |
| 289 | + } |
| 290 | + if "deprecated" in arg["loc_args"] and arg["loc_args"]["deprecated"]: |
| 291 | + parameter["deprecated"] = arg["loc_args"]["deprecated"] |
| 292 | + oapi_parameters.append(parameter) |
| 293 | + if len(oapi_parameters) > 0: |
| 294 | + oapi_operation["parameters"] = oapi_parameters |
| 295 | + if len(oapi_request_body["content"].keys()) > 0: |
| 296 | + oapi_operation["requestBody"] = oapi_request_body |
| 297 | + print(route["decorators"]) |
| 298 | + for decorator in route["decorators"]: |
| 299 | + for partial_decorator in ["@warnings.deprecated", "@deprecated"]: # Support for PEP 702 in Python 3.13 |
| 300 | + if partial_decorator in decorator: |
| 301 | + oapi_operation["deprecated"] = True |
| 302 | + if route["deprecated"]: # Fallback on kwarg passed to @ValidateParameters() |
| 303 | + oapi_operation["deprecated"] = route["deprecated"] |
| 304 | + if route["responses"]: |
| 305 | + oapi_operation["responses"] = route["responses"] |
| 306 | + for method in route["methods"]: |
| 307 | + if method not in ["OPTIONS", "HEAD"]: |
| 308 | + oapi_path_item[method.lower()] = oapi_operation |
| 309 | + if oapi_path_route in oapi_paths: |
| 310 | + oapi_paths[oapi_path_route] = oapi_paths[oapi_path_route] | oapi_path_item |
| 311 | + else: |
| 312 | + oapi_paths[oapi_path_route] = oapi_path_item |
| 313 | + return oapi_paths |
| 314 | + |
| 315 | + |
| 316 | + |
| 317 | +@docs_blueprint.route("/openapi") |
| 318 | +def docs_openapi(): |
| 319 | + """ |
| 320 | + Provide the documentation in OpenAPI format |
| 321 | + """ |
| 322 | + config = flask.current_app.config |
| 323 | + if not config.get("FPV_OPENAPI_ENABLE", False): |
| 324 | + return fpv_error("FPV_OPENAPI_ENABLE is not set, and defaults to False") |
| 325 | + |
| 326 | + supported_versions = ["3.1.0"] |
| 327 | + openapi_base = config.get("FPV_OPENAPI_BASE", {"openapi": None}) |
| 328 | + if openapi_base["openapi"] not in supported_versions: |
| 329 | + return fpv_error(f"Flask-Parameter-Validation only supports OpenAPI {', '.join(supported_versions)}, {openapi_base['openapi']} provided") |
| 330 | + if "paths" in openapi_base: |
| 331 | + return fpv_error(f"Flask-Parameter-Validation will overwrite the paths value of FPV_OPENAPI_BASE") |
| 332 | + openapi_paths = generate_openapi_paths_object() |
| 333 | + openapi_document = json.loads(json.dumps(openapi_base)) |
| 334 | + openapi_document["paths"] = openapi_paths |
| 335 | + return jsonify(openapi_document) |
0 commit comments