Skip to content

Commit

Permalink
feat: Improve openapi to module script
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkheir committed Feb 27, 2025
1 parent f18bea4 commit 6846431
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 39 deletions.
5 changes: 3 additions & 2 deletions sekoia_automation/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ def send_results(self):

class GenericAPIAction(Action):
# Endpoint Specific Information, should be defined in subclasses
base_url = ""
verb: str
endpoint: str
query_parameters: list[str]
Expand All @@ -284,8 +285,8 @@ def get_headers(self):
return headers

def get_url(self, arguments):
# Specific Informations, should be defined in the Module Configuration
url = self.module.configuration["base_url"]
# Specific Information, should be defined in the Module Configuration
url = self.module.configuration.get("base_url") or self.base_url

match = re.findall("{(.*?)}", self.endpoint)
for replacement in match:
Expand Down
124 changes: 87 additions & 37 deletions sekoia_automation/scripts/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,28 @@
import shutil
from functools import cached_property
from pathlib import Path
from posixpath import join as urljoin
from uuid import uuid4

import requests
import typer
import yaml
from black import Mode, WriteBack, format_file_in_place
from requests import Response
from rich import print


class OpenApiToModule:
file_header = """
from sekoia_automation.action import GenericAPIAction
class Base(GenericAPIAction):
base_url = "{base_url}"
"""
class_template = """
class {name}(GenericAPIAction):
class {name}(Base):
verb = "{verb}"
endpoint = base_url + "{endpoint}"
endpoint = "{endpoint}"
query_parameters = {query_parameters}
"""

Expand All @@ -27,14 +35,22 @@ def swagger(self) -> dict:
"""
Read a swagger from a specified url or from a file.
"""
if self.swagger_file.startswith("http") and self.swagger_file.endswith(".json"):
response: Response = requests.get(
self.swagger_file, headers={"Accept": "application/json"}, timeout=30
)
if self.swagger_file.startswith("http"):
response: Response = requests.get(self.swagger_file, timeout=30)
print(response.text)
response.raise_for_status()
return response.json()
if self.swagger_file.endswith(".json"):
return response.json()
if self.swagger_file.endswith(".yaml") or self.swagger_file.endswith(
".yml"
):
return yaml.safe_load(response.text)

with open(self.swagger_file) as fd:
if self.swagger_file.endswith(".yaml") or self.swagger_file.endswith(
".yml"
):
return yaml.safe_load(fd)
return json.load(fd)

@property
Expand All @@ -58,7 +74,7 @@ def __init__(self, modules_path: Path, swagger_file: str, use_tags: bool):
self.swagger_file = swagger_file
self.use_tags = use_tags
self.modules_path = modules_path
self.actions: list[str] = []
self.actions: list[dict] = []
self.classes: list[str] = []

def add_results_to_action(self, action: dict, action_method: dict):
Expand Down Expand Up @@ -99,20 +115,41 @@ def get_properties(endpoint_params: dict, action_method: dict) -> tuple[dict, li
if param_required:
required.append(param_name)

if param_in in ["body", "formData"] or "schema" in param.keys():
param = param["schema"]
for v in param.get("properties", {}).values():
if param_in in ["body", "formData"] and "schema" in param.keys():
nested_param = param["schema"]
for v in nested_param.get("properties", {}).values():
v.update({"in": param_in})

properties.update(param.get("properties", {}))
required += param.get("required", [])
properties.update(nested_param.get("properties", {}))
required += nested_param.get("required", [])
continue

prop = {"in": param_in}
if _type := param.get("type"):
prop["type"] = _type
for key in [
"type",
"description",
"example",
"default",
"minimum",
"maximum",
"enum",
]:
if key in param:
prop[key] = param[key]
elif key in param.get("schema", {}):
prop[key] = param["schema"][key]

properties.update({param_name: prop})

if body := action_method.get("requestBody"):
content: dict = next(iter(body["content"].values()), {})
if schema := content.get("schema"):
for v in schema.get("properties", {}).values():
v.update({"in": "body"})

properties.update(schema.get("properties", {}))
required += schema.get("required", [])

return properties, required

@staticmethod
Expand All @@ -130,9 +167,10 @@ def get_action_name(
else:
name = docker_parameters

return re.sub("[^A-Za-z1-9]", "_", name).lower()
return re.sub("[^A-Za-z0-9]", "_", name).lower().strip("_")

def generate_actions(self, endpoint: str, methods: dict):
def generate_actions(self, endpoint: str, methods: dict, base_path: str = ""):
endpoint = endpoint.lstrip("/")
list_methods: list = list(methods.keys())

endpoint_params: dict = methods.get("parameters", {})
Expand All @@ -142,19 +180,26 @@ def generate_actions(self, endpoint: str, methods: dict):

action_method: dict = methods[method]

docker_parameters: str = f"{method}-{endpoint.lstrip('/')}"
docker_parameters: str = (
action_method.get("operationId") or f"{method}-{endpoint}"
)
name: str = self.get_action_name(
action_method, docker_parameters, self.use_tags
)
action_path: str = os.path.join(self.module_path, f"action_{name}.json")
action_path = os.path.join(self.module_path, f"action_{name}.json")
if action_path in self.actions:
print("[orange3] Action name '{name}' already used[/orange3]")
continue
name = f"{name}_{method}"
action_path = os.path.join(self.module_path, f"action_{name}.json")
if action_path in self.actions:
print(f"[orange3] Action name '{name}' already used[/orange3]")
continue

action: dict = {
"uuid": str(uuid4()),
"name": name,
"docker_parameters": docker_parameters,
"method": method,
"endpoint": urljoin(base_path, endpoint),
}
desc: str = str(action_method.get("description"))

Expand All @@ -179,9 +224,18 @@ def generate_actions(self, endpoint: str, methods: dict):
self.add_results_to_action(action=action, action_method=action_method)

with open(action_path, "w") as fd:
fd.write(json.dumps(action, indent=4))
fd.write(
json.dumps(
{
k: v
for k, v in action.items()
if k not in ["method", "endpoint"]
},
indent=4,
)
)

self.actions.append(action_path)
self.actions.append(action)

def generate_module(self) -> str:
info = self.swagger["info"]
Expand All @@ -203,32 +257,26 @@ def generate_module(self) -> str:
return docker

def generate_classes(self):
content = (
'from sekoia_automation.action import GenericAPIAction\n\n\nbase_url = ""\n'
)
base_url = self.swagger.get("servers", [{}])[0].get("url", "")
content = self.file_header.format(base_url=base_url)

for file in self.actions:
with open(file) as fd:
action_json: dict = json.load(fd)

docker_parameters = action_json["docker_parameters"]
for action in self.actions:
docker_parameters = action["docker_parameters"]
query_parameters: list = [
x
for x, v in action_json["arguments"]["properties"].items()
for x, v in action["arguments"]["properties"].items()
if v.get("in") == "query"
]

name = re.sub("[^A-Za-z1-9]", "-", action_json["name"])
name = re.sub("[^A-Za-z0-9]", "-", action["name"])
for article in ["a", "an", "the"]:
name = name.replace("-" + article + "-", "-")
name = "".join(list(map(str.capitalize, name.split("-"))))

verb, endpoint = docker_parameters.split("-", 1)

content += self.class_template.format(
name=name,
verb=verb,
endpoint=endpoint,
verb=action["method"],
endpoint=action["endpoint"],
query_parameters=query_parameters,
)

Expand Down Expand Up @@ -282,10 +330,12 @@ def run(self):
print(f"Module generated at {self.module_path}")

print("Generating actions ...")
base_path = self.swagger.get("basePath", "")
for endpoint, methods in self.swagger.get("paths").items():
self.generate_actions(
endpoint=endpoint,
methods=methods,
base_path=base_path,
)
print(f"{len(self.actions)} actions have been generated")

Expand Down

0 comments on commit 6846431

Please sign in to comment.