-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
4664ed1
commit 78760fd
Showing
6 changed files
with
334 additions
and
66 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from boredcharts.cli.cli import main | ||
|
||
__all__ = [ | ||
"main", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
import argparse | ||
import asyncio | ||
import importlib | ||
import multiprocessing | ||
import time | ||
from pathlib import Path | ||
from typing import Literal, NamedTuple | ||
|
||
import httpx | ||
import uvicorn | ||
from fastapi import FastAPI | ||
from starlette.routing import NoMatchFound | ||
|
||
from boredcharts.cli.discover import get_import_string | ||
from boredcharts.pdf import UrlToPdfFile, print_to_pdf_manual | ||
|
||
|
||
class Report(NamedTuple): | ||
name: str | ||
urlpath: str | ||
tag: str | ||
|
||
|
||
def get_report_url( | ||
path: Path | None, | ||
app_name: str | None, | ||
name: str, | ||
) -> str: | ||
import_str = get_import_string(path=path, app_name=app_name) # mutates sys.path | ||
mod = importlib.import_module(import_str.split(":")[0]) | ||
app = getattr(mod, import_str.split(":")[1]) | ||
assert isinstance(app, FastAPI) | ||
return app.url_path_for(name) | ||
|
||
|
||
def get_reports( | ||
path: Path | None, | ||
app_name: str | None, | ||
) -> list[Report]: | ||
import_str = get_import_string(path=path, app_name=app_name) # mutates sys.path | ||
mod = importlib.import_module(import_str.split(":")[0]) | ||
app = getattr(mod, import_str.split(":")[1]) | ||
assert isinstance(app, FastAPI) | ||
openapi = app.openapi() | ||
paths = openapi["paths"] | ||
|
||
reports: list[Report] = [] | ||
for urlpath, methods in paths.items(): | ||
assert urlpath is not None | ||
for method, data in methods.items(): | ||
if method != "get": | ||
continue | ||
|
||
tags = data.get("tags") | ||
if tags is None: | ||
continue | ||
assert isinstance(tags, list) | ||
tags = [tag for tag in tags if tag.startswith("report")] | ||
if not tags: | ||
continue | ||
|
||
name = data.get("summary") | ||
assert isinstance(name, str) | ||
name = name.lower().replace(" ", "_") | ||
if name.startswith("index"): | ||
continue | ||
|
||
for tag in tags: | ||
reports.append(Report(name=name, urlpath=urlpath, tag=tag)) | ||
|
||
return reports | ||
|
||
|
||
def _run_uvicorn( | ||
path: Path | None, | ||
app_name: str | None, | ||
reload: bool = False, | ||
host: str = "127.0.0.1", | ||
port: int = 4000, | ||
log_level: Literal[ | ||
"critical", | ||
"error", | ||
"warning", | ||
"info", | ||
"debug", | ||
"trace", | ||
] = "info", | ||
) -> None: | ||
import_str = get_import_string(path=path, app_name=app_name) | ||
uvicorn.run( | ||
import_str, | ||
host=host, | ||
port=port, | ||
proxy_headers=True, | ||
forwarded_allow_ips="*", | ||
reload=reload, | ||
log_level=log_level, | ||
) | ||
|
||
|
||
def init( | ||
path: Path | None, | ||
app_name: str | None, | ||
) -> None: | ||
"""create a new project scaffolding""" | ||
raise NotImplementedError | ||
|
||
|
||
def list_reports( | ||
path: Path | None, | ||
app_name: str | None, | ||
) -> None: | ||
"""list available reports""" | ||
reports = get_reports(path, app_name) | ||
reports = sorted(reports, key=lambda x: x[2]) | ||
urlpathwidth = max(len(r.urlpath) for r in reports) | ||
name = max(len(r.name) for r in reports) | ||
tagwidth = max(len(r.tag) for r in reports) | ||
print( | ||
f"{"CATEGORY".ljust(tagwidth)} {"REPORT".ljust(name)} {"URL".ljust(urlpathwidth)}" | ||
) | ||
for r in reports: | ||
print( | ||
f"{r.tag.ljust(tagwidth)} {r.name.ljust(name)} {r.urlpath.ljust(urlpathwidth)}" | ||
) | ||
|
||
|
||
def export( | ||
path: Path | None, | ||
app_name: str | None, | ||
report: str, | ||
*, | ||
exporter: UrlToPdfFile = print_to_pdf_manual, | ||
) -> None: | ||
"""write to pdf | ||
TODO: | ||
- [x] write to pdf | ||
- [x] spin up server | ||
- [x] provide list of reports | ||
""" | ||
try: | ||
route = get_report_url(path, app_name, report) | ||
except NoMatchFound: | ||
print(f"Report {report} not found!") | ||
print("Use `boredcharts list` to see available reports.") | ||
raise SystemExit(1) | ||
|
||
host = "127.0.0.1" | ||
port = 4001 # different port just for exports | ||
base_url = f"http://{host}:{port}" | ||
process = multiprocessing.Process( | ||
target=_run_uvicorn, | ||
kwargs=dict( | ||
path=path, | ||
app_name=app_name, | ||
reload=False, | ||
host=host, | ||
port=port, | ||
log_level="warning", | ||
), | ||
) | ||
|
||
print("Spinning up boredcharts app", end="", flush=True) | ||
process.start() | ||
for _ in range(10): | ||
print(".", end="", flush=True) | ||
time.sleep(0.1) | ||
try: | ||
resp = httpx.get(f"{base_url}/healthz") | ||
except httpx.HTTPError: | ||
continue | ||
if resp.is_success: | ||
print(" started!") | ||
break | ||
else: | ||
print(" health check failed!") | ||
raise Exception("Couldn't start app!") | ||
|
||
url = f"{base_url}{route}" | ||
file = Path(report.replace(".", "-")).absolute().with_suffix(".pdf") | ||
asyncio.run(exporter(url, file)) | ||
print(f"Exported {report} to {file}") | ||
|
||
process.terminate() | ||
|
||
|
||
def dev(path: Path | None, app_name: str | None) -> None: | ||
"""run uvicorn with reload""" | ||
_run_uvicorn(path, app_name, reload=True) | ||
|
||
|
||
def run(path: Path | None, app_name: str | None) -> None: | ||
"""run uvicorn without reload""" | ||
_run_uvicorn(path, app_name, reload=False) | ||
|
||
|
||
def main() -> None: | ||
"""cli entrypoint""" | ||
parser = argparse.ArgumentParser(description="boredcharts CLI") | ||
parser.add_argument("path", type=Path, default=None, help="Path to FastAPI app") | ||
parser.add_argument("--app-name", type=str, default=None, help="FastAPI app name") | ||
|
||
subparsers = parser.add_subparsers(dest="command") | ||
subparsers.required = True | ||
|
||
parser_init = subparsers.add_parser("init", help="Create a new project scaffolding") | ||
parser_init.set_defaults(func=init) | ||
|
||
parser_init = subparsers.add_parser("list", help="List available reports") | ||
parser_init.set_defaults(func=list_reports) | ||
|
||
parser_export = subparsers.add_parser("export", help="Write report to PDF") | ||
parser_export.add_argument("report", type=str, help="The report to export") | ||
parser_export.set_defaults(func=export) | ||
|
||
parser_dev = subparsers.add_parser("dev", help="Run uvicorn with reload") | ||
parser_dev.set_defaults(func=dev) | ||
|
||
parser_serve = subparsers.add_parser("run", help="Run uvicorn without reload") | ||
parser_serve.set_defaults(func=run) | ||
|
||
args = parser.parse_args() | ||
|
||
func_args = {k: v for k, v in vars(args).items() if k != "func" and k != "command"} | ||
args.func(**func_args) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
"""forked from fastapi_cli.discover because i needed to get rid of the print statements""" | ||
|
||
import importlib | ||
import sys | ||
from dataclasses import dataclass | ||
from pathlib import Path | ||
from typing import Union | ||
|
||
from fastapi import FastAPI | ||
|
||
|
||
def get_default_path() -> Path: | ||
potential_paths = ( | ||
"main.py", | ||
"app.py", | ||
"api.py", | ||
"app/main.py", | ||
"app/app.py", | ||
"app/api.py", | ||
) | ||
|
||
for full_path in potential_paths: | ||
path = Path(full_path) | ||
if path.is_file(): | ||
return path | ||
|
||
raise Exception( | ||
"Could not find a default file to run, please provide an explicit path" | ||
) | ||
|
||
|
||
@dataclass | ||
class ModuleData: | ||
module_import_str: str | ||
extra_sys_path: Path | ||
|
||
|
||
def get_module_data_from_path(path: Path) -> ModuleData: | ||
use_path = path.resolve() | ||
module_path = use_path | ||
if use_path.is_file() and use_path.stem == "__init__": | ||
module_path = use_path.parent | ||
module_paths = [module_path] | ||
extra_sys_path = module_path.parent | ||
for parent in module_path.parents: | ||
init_path = parent / "__init__.py" | ||
if init_path.is_file(): | ||
module_paths.insert(0, parent) | ||
extra_sys_path = parent.parent | ||
else: | ||
break | ||
module_str = ".".join(p.stem for p in module_paths) | ||
return ModuleData( | ||
module_import_str=module_str, extra_sys_path=extra_sys_path.resolve() | ||
) | ||
|
||
|
||
def get_app_name(*, mod_data: ModuleData, app_name: Union[str, None] = None) -> str: | ||
try: | ||
mod = importlib.import_module(mod_data.module_import_str) | ||
except (ImportError, ValueError): | ||
raise # missing __init__.py? | ||
object_names = dir(mod) | ||
object_names_set = set(object_names) | ||
if app_name: | ||
if app_name not in object_names_set: | ||
raise Exception( | ||
f"Could not find app name {app_name} in {mod_data.module_import_str}" | ||
) | ||
app = getattr(mod, app_name) | ||
if not isinstance(app, FastAPI): | ||
raise Exception( | ||
f"The app name {app_name} in {mod_data.module_import_str} doesn't seem to be a FastAPI app" | ||
) | ||
return app_name | ||
for preferred_name in ["app", "api"]: | ||
if preferred_name in object_names_set: | ||
obj = getattr(mod, preferred_name) | ||
if isinstance(obj, FastAPI): | ||
return preferred_name | ||
for name in object_names: | ||
obj = getattr(mod, name) | ||
if isinstance(obj, FastAPI): | ||
return name | ||
raise Exception("Could not find FastAPI app in module, try using --app") | ||
|
||
|
||
def get_import_string(*, path: Path | None = None, app_name: str | None = None) -> str: | ||
if not path: | ||
path = get_default_path() | ||
if not path.exists(): | ||
raise Exception(f"Path does not exist {path}") | ||
mod_data = get_module_data_from_path(path) | ||
sys.path.insert(0, str(mod_data.extra_sys_path)) | ||
use_app_name = get_app_name(mod_data=mod_data, app_name=app_name) | ||
import_string = f"{mod_data.module_import_str}:{use_app_name}" | ||
return import_string |
Oops, something went wrong.