From 6846431d5380f05bb61e4b88583b2eece79f6a4d Mon Sep 17 00:00:00 2001 From: Darkheir Date: Thu, 27 Feb 2025 15:10:03 +0100 Subject: [PATCH] feat: Improve openapi to module script --- sekoia_automation/action.py | 5 +- sekoia_automation/scripts/openapi.py | 124 +++++++++++++++++++-------- 2 files changed, 90 insertions(+), 39 deletions(-) diff --git a/sekoia_automation/action.py b/sekoia_automation/action.py index 245e82c..eaa7239 100644 --- a/sekoia_automation/action.py +++ b/sekoia_automation/action.py @@ -271,6 +271,7 @@ def send_results(self): class GenericAPIAction(Action): # Endpoint Specific Information, should be defined in subclasses + base_url = "" verb: str endpoint: str query_parameters: list[str] @@ -284,8 +285,8 @@ def get_headers(self): return headers def get_url(self, arguments): - # Specific Informations, should be defined in the Module Configuration - url = self.module.configuration["base_url"] + # Specific Information, should be defined in the Module Configuration + url = self.module.configuration.get("base_url") or self.base_url match = re.findall("{(.*?)}", self.endpoint) for replacement in match: diff --git a/sekoia_automation/scripts/openapi.py b/sekoia_automation/scripts/openapi.py index 8e8bc01..6cab6cf 100644 --- a/sekoia_automation/scripts/openapi.py +++ b/sekoia_automation/scripts/openapi.py @@ -5,20 +5,28 @@ import shutil from functools import cached_property from pathlib import Path +from posixpath import join as urljoin from uuid import uuid4 import requests import typer +import yaml from black import Mode, WriteBack, format_file_in_place from requests import Response from rich import print class OpenApiToModule: + file_header = """ +from sekoia_automation.action import GenericAPIAction + +class Base(GenericAPIAction): + base_url = "{base_url}" +""" class_template = """ -class {name}(GenericAPIAction): +class {name}(Base): verb = "{verb}" - endpoint = base_url + "{endpoint}" + endpoint = "{endpoint}" query_parameters = {query_parameters} """ @@ -27,14 +35,22 @@ def swagger(self) -> dict: """ Read a swagger from a specified url or from a file. """ - if self.swagger_file.startswith("http") and self.swagger_file.endswith(".json"): - response: Response = requests.get( - self.swagger_file, headers={"Accept": "application/json"}, timeout=30 - ) + if self.swagger_file.startswith("http"): + response: Response = requests.get(self.swagger_file, timeout=30) + print(response.text) response.raise_for_status() - return response.json() + if self.swagger_file.endswith(".json"): + return response.json() + if self.swagger_file.endswith(".yaml") or self.swagger_file.endswith( + ".yml" + ): + return yaml.safe_load(response.text) with open(self.swagger_file) as fd: + if self.swagger_file.endswith(".yaml") or self.swagger_file.endswith( + ".yml" + ): + return yaml.safe_load(fd) return json.load(fd) @property @@ -58,7 +74,7 @@ def __init__(self, modules_path: Path, swagger_file: str, use_tags: bool): self.swagger_file = swagger_file self.use_tags = use_tags self.modules_path = modules_path - self.actions: list[str] = [] + self.actions: list[dict] = [] self.classes: list[str] = [] def add_results_to_action(self, action: dict, action_method: dict): @@ -99,20 +115,41 @@ def get_properties(endpoint_params: dict, action_method: dict) -> tuple[dict, li if param_required: required.append(param_name) - if param_in in ["body", "formData"] or "schema" in param.keys(): - param = param["schema"] - for v in param.get("properties", {}).values(): + if param_in in ["body", "formData"] and "schema" in param.keys(): + nested_param = param["schema"] + for v in nested_param.get("properties", {}).values(): v.update({"in": param_in}) - properties.update(param.get("properties", {})) - required += param.get("required", []) + properties.update(nested_param.get("properties", {})) + required += nested_param.get("required", []) continue + prop = {"in": param_in} - if _type := param.get("type"): - prop["type"] = _type + for key in [ + "type", + "description", + "example", + "default", + "minimum", + "maximum", + "enum", + ]: + if key in param: + prop[key] = param[key] + elif key in param.get("schema", {}): + prop[key] = param["schema"][key] properties.update({param_name: prop}) + if body := action_method.get("requestBody"): + content: dict = next(iter(body["content"].values()), {}) + if schema := content.get("schema"): + for v in schema.get("properties", {}).values(): + v.update({"in": "body"}) + + properties.update(schema.get("properties", {})) + required += schema.get("required", []) + return properties, required @staticmethod @@ -130,9 +167,10 @@ def get_action_name( else: name = docker_parameters - return re.sub("[^A-Za-z1-9]", "_", name).lower() + return re.sub("[^A-Za-z0-9]", "_", name).lower().strip("_") - def generate_actions(self, endpoint: str, methods: dict): + def generate_actions(self, endpoint: str, methods: dict, base_path: str = ""): + endpoint = endpoint.lstrip("/") list_methods: list = list(methods.keys()) endpoint_params: dict = methods.get("parameters", {}) @@ -142,19 +180,26 @@ def generate_actions(self, endpoint: str, methods: dict): action_method: dict = methods[method] - docker_parameters: str = f"{method}-{endpoint.lstrip('/')}" + docker_parameters: str = ( + action_method.get("operationId") or f"{method}-{endpoint}" + ) name: str = self.get_action_name( action_method, docker_parameters, self.use_tags ) - action_path: str = os.path.join(self.module_path, f"action_{name}.json") + action_path = os.path.join(self.module_path, f"action_{name}.json") if action_path in self.actions: - print("[orange3] Action name '{name}' already used[/orange3]") - continue + name = f"{name}_{method}" + action_path = os.path.join(self.module_path, f"action_{name}.json") + if action_path in self.actions: + print(f"[orange3] Action name '{name}' already used[/orange3]") + continue action: dict = { "uuid": str(uuid4()), "name": name, "docker_parameters": docker_parameters, + "method": method, + "endpoint": urljoin(base_path, endpoint), } desc: str = str(action_method.get("description")) @@ -179,9 +224,18 @@ def generate_actions(self, endpoint: str, methods: dict): self.add_results_to_action(action=action, action_method=action_method) with open(action_path, "w") as fd: - fd.write(json.dumps(action, indent=4)) + fd.write( + json.dumps( + { + k: v + for k, v in action.items() + if k not in ["method", "endpoint"] + }, + indent=4, + ) + ) - self.actions.append(action_path) + self.actions.append(action) def generate_module(self) -> str: info = self.swagger["info"] @@ -203,32 +257,26 @@ def generate_module(self) -> str: return docker def generate_classes(self): - content = ( - 'from sekoia_automation.action import GenericAPIAction\n\n\nbase_url = ""\n' - ) + base_url = self.swagger.get("servers", [{}])[0].get("url", "") + content = self.file_header.format(base_url=base_url) - for file in self.actions: - with open(file) as fd: - action_json: dict = json.load(fd) - - docker_parameters = action_json["docker_parameters"] + for action in self.actions: + docker_parameters = action["docker_parameters"] query_parameters: list = [ x - for x, v in action_json["arguments"]["properties"].items() + for x, v in action["arguments"]["properties"].items() if v.get("in") == "query" ] - name = re.sub("[^A-Za-z1-9]", "-", action_json["name"]) + name = re.sub("[^A-Za-z0-9]", "-", action["name"]) for article in ["a", "an", "the"]: name = name.replace("-" + article + "-", "-") name = "".join(list(map(str.capitalize, name.split("-")))) - verb, endpoint = docker_parameters.split("-", 1) - content += self.class_template.format( name=name, - verb=verb, - endpoint=endpoint, + verb=action["method"], + endpoint=action["endpoint"], query_parameters=query_parameters, ) @@ -282,10 +330,12 @@ def run(self): print(f"Module generated at {self.module_path}") print("Generating actions ...") + base_path = self.swagger.get("basePath", "") for endpoint, methods in self.swagger.get("paths").items(): self.generate_actions( endpoint=endpoint, methods=methods, + base_path=base_path, ) print(f"{len(self.actions)} actions have been generated")