-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add monkey patching of typer and click-repl (#98)
* Add monkeypatching of typer and click-repl * Add help text changelog entry
- Loading branch information
Showing
6 changed files
with
370 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from __future__ import annotations | ||
|
||
from harbor_cli._patches import click_repl | ||
from harbor_cli._patches import typer | ||
|
||
|
||
def patch_all() -> None: | ||
"""Apply all patches to all modules.""" | ||
typer.patch() | ||
click_repl.patch() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
"""Patches for the click_repl package.""" | ||
|
||
from __future__ import annotations | ||
|
||
import shlex | ||
import sys | ||
from typing import Any | ||
from typing import Dict | ||
from typing import Optional | ||
|
||
import click | ||
import click_repl # type: ignore | ||
from click.exceptions import Exit as ClickExit | ||
from click_repl import ExitReplException # type: ignore | ||
from click_repl import bootstrap_prompt # type: ignore | ||
from click_repl import dispatch_repl_commands # type: ignore | ||
from click_repl import handle_internal_commands # type: ignore | ||
from prompt_toolkit.shortcuts import prompt | ||
|
||
from harbor_cli._patches.common import get_patcher | ||
from harbor_cli.exceptions import handle_exception | ||
|
||
patcher = get_patcher(f"click_repl version: {click_repl.__version__}") | ||
|
||
|
||
def patch_exception_handling() -> None: | ||
"""Patch click_repl's exception handling to fall back on the | ||
CLI's exception handlers instead of propagating them. | ||
Without this patch, any exceptions other than SystemExit and ClickExit | ||
will cause the REPL to exit. This is not desirable, as we want | ||
raise exceptions for control flow purposes in commands to abort them, | ||
but not terminate the CLi completely. | ||
A failed command should return to the REPL prompt instead of exiting | ||
the REPL. | ||
""" | ||
|
||
def repl( # noqa: C901 | ||
old_ctx: click.Context, | ||
prompt_kwargs: Optional[Dict[str, Any]] = None, | ||
allow_system_commands: bool = True, | ||
allow_internal_commands: bool = True, | ||
) -> Any: | ||
""" | ||
Start an interactive shell. All subcommands are available in it. | ||
:param old_ctx: The current Click context. | ||
:param prompt_kwargs: Parameters passed to | ||
:py:func:`prompt_toolkit.shortcuts.prompt`. | ||
If stdin is not a TTY, no prompt will be printed, but only commands read | ||
from stdin. | ||
""" | ||
# parent should be available, but we're not going to bother if not | ||
group_ctx = old_ctx.parent or old_ctx | ||
group = group_ctx.command | ||
isatty = sys.stdin.isatty() | ||
|
||
# Delete the REPL command from those available, as we don't want to allow | ||
# nesting REPLs (note: pass `None` to `pop` as we don't want to error if | ||
# REPL command already not present for some reason). | ||
repl_command_name = old_ctx.command.name | ||
if isinstance(group_ctx.command, click.CommandCollection): | ||
available_commands = { # type: ignore | ||
cmd_name: cmd_obj | ||
for source in group_ctx.command.sources | ||
for cmd_name, cmd_obj in source.commands.items() # type: ignore | ||
} | ||
else: | ||
available_commands = group_ctx.command.commands # type: ignore | ||
available_commands.pop(repl_command_name, None) # type: ignore | ||
|
||
prompt_kwargs = bootstrap_prompt(prompt_kwargs, group) | ||
|
||
if isatty: | ||
|
||
def get_command(): | ||
return prompt(**prompt_kwargs) | ||
|
||
else: | ||
get_command = sys.stdin.readline | ||
|
||
while True: | ||
try: | ||
command = get_command() | ||
except KeyboardInterrupt: | ||
continue | ||
except EOFError: | ||
break | ||
|
||
if not command: | ||
if isatty: | ||
continue | ||
else: | ||
break | ||
|
||
if allow_system_commands and dispatch_repl_commands(command): | ||
continue | ||
|
||
if allow_internal_commands: | ||
try: | ||
result = handle_internal_commands(command) # type: ignore | ||
if isinstance(result, str): | ||
click.echo(result) | ||
continue | ||
except ExitReplException: | ||
break | ||
|
||
try: | ||
args = shlex.split(command) | ||
except ValueError as e: | ||
click.echo(f"{type(e).__name__}: {e}") | ||
continue | ||
|
||
try: | ||
with group.make_context(None, args, parent=group_ctx) as ctx: | ||
group.invoke(ctx) | ||
ctx.exit() | ||
except click.ClickException as e: | ||
e.show() | ||
except ClickExit: | ||
pass | ||
except SystemExit: | ||
pass | ||
except ExitReplException: | ||
break | ||
# PATCH: Patched to handle zabbix-cli exceptions | ||
except Exception as e: | ||
try: | ||
handle_exception(e) # this could be dangerous? Infinite looping? | ||
except SystemExit: | ||
pass | ||
# PATCH: Patched to continue on keyboard interrupt | ||
except KeyboardInterrupt: | ||
from harbor_cli.output.console import err_console | ||
|
||
# User likely pressed Ctrl+C during a prompt or when a spinner | ||
# was active. Ensure message is printed on a new line. | ||
# TODO: determine if last char in terminal was newline somehow! Can we? | ||
err_console.print("\n[red]Aborted.[/]") | ||
pass | ||
|
||
with patcher("click_repl.repl"): | ||
click_repl.repl = repl | ||
|
||
|
||
def patch() -> None: | ||
"""Apply all patches.""" | ||
patch_exception_handling() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
from __future__ import annotations | ||
|
||
from abc import ABC | ||
from abc import abstractmethod | ||
from typing import TYPE_CHECKING | ||
|
||
if TYPE_CHECKING: | ||
from types import TracebackType | ||
from typing import Optional | ||
from typing import Type | ||
|
||
|
||
class BasePatcher(ABC): | ||
"""Context manager that logs and prints diagnostic info if an exception | ||
occurs.""" | ||
|
||
def __init__(self, description: str) -> None: | ||
self.description = description | ||
|
||
@abstractmethod | ||
def __package_info__(self) -> str: | ||
raise NotImplementedError | ||
|
||
def __enter__(self) -> BasePatcher: | ||
return self | ||
|
||
def __exit__( | ||
self, | ||
exc_type: Optional[Type[BaseException]], | ||
exc_val: Optional[BaseException], | ||
exc_tb: Optional[TracebackType], | ||
) -> bool: | ||
if not exc_type: | ||
return True | ||
import sys | ||
|
||
import rich | ||
from rich.table import Table | ||
|
||
from harbor_cli.__about__ import __version__ | ||
|
||
# Rudimentary, but provides enough info to debug and fix the issue | ||
console = rich.console.Console(stderr=True) | ||
console.print_exception() | ||
console.print() | ||
table = Table( | ||
title="Diagnostics", | ||
show_header=False, | ||
show_lines=False, | ||
) | ||
table.add_row( | ||
"[b]Package [/]", | ||
self.__package_info__(), | ||
) | ||
table.add_row( | ||
"[b]zabbix-cli [/]", | ||
__version__, | ||
) | ||
table.add_row( | ||
"[b]Python [/]", | ||
sys.version, | ||
) | ||
table.add_row( | ||
"[b]Platform [/]", | ||
sys.platform, | ||
) | ||
console.print(table) | ||
console.print(f"[bold red]ERROR: Failed to patch {self.description}[/]") | ||
return True # suppress exception | ||
|
||
|
||
def get_patcher(info: str) -> Type[BasePatcher]: | ||
"""Returns a patcher for a given package.""" | ||
|
||
class Patcher(BasePatcher): | ||
def __package_info__(self) -> str: | ||
return info | ||
|
||
return Patcher |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
"""Monkeypatches Typer to extend certain functionality and change the | ||
styling of its output. | ||
Will probably break for some version of Typer at some point.""" | ||
|
||
from __future__ import annotations | ||
|
||
import inspect | ||
from typing import Iterable | ||
from typing import Union | ||
|
||
import click | ||
import typer | ||
|
||
from harbor_cli._patches.common import get_patcher | ||
|
||
patcher = get_patcher(f"Typer version: {typer.__version__}") | ||
|
||
|
||
def patch_help_text_style() -> None: | ||
"""Remove dimming of help text. | ||
https://github.com/tiangolo/typer/issues/437#issuecomment-1224149402 | ||
""" | ||
with patcher("typer.rich_utils.STYLE_HELPTEXT"): | ||
typer.rich_utils.STYLE_HELPTEXT = "" # type: ignore | ||
|
||
|
||
def patch_help_text_spacing() -> None: | ||
"""Adds a single blank line between short and long help text of a command | ||
when using `--help`. | ||
As of Typer 0.9.0, the short and long help text is printed without any | ||
blank lines between them. This is bad for readability (IMO). | ||
""" | ||
from rich.console import group | ||
from rich.markdown import Markdown | ||
from rich.text import Text | ||
from typer.rich_utils import DEPRECATED_STRING | ||
from typer.rich_utils import MARKUP_MODE_MARKDOWN | ||
from typer.rich_utils import MARKUP_MODE_RICH | ||
from typer.rich_utils import STYLE_DEPRECATED | ||
from typer.rich_utils import STYLE_HELPTEXT | ||
from typer.rich_utils import STYLE_HELPTEXT_FIRST_LINE | ||
from typer.rich_utils import MarkupMode | ||
from typer.rich_utils import _make_rich_rext # type: ignore | ||
|
||
@group() | ||
def _get_help_text( | ||
*, | ||
obj: Union[click.Command, click.Group], | ||
markup_mode: MarkupMode, | ||
) -> Iterable[Union[Markdown, Text]]: | ||
"""Build primary help text for a click command or group. | ||
Returns the prose help text for a command or group, rendered either as a | ||
Rich Text object or as Markdown. | ||
If the command is marked as deprecated, the deprecated string will be prepended. | ||
""" | ||
# Prepend deprecated status | ||
if obj.deprecated: | ||
yield Text(DEPRECATED_STRING, style=STYLE_DEPRECATED) | ||
|
||
# Fetch and dedent the help text | ||
help_text = inspect.cleandoc(obj.help or "") | ||
|
||
# Trim off anything that comes after \f on its own line | ||
help_text = help_text.partition("\f")[0] | ||
|
||
# Get the first paragraph | ||
first_line = help_text.split("\n\n")[0] | ||
# Remove single linebreaks | ||
if markup_mode != MARKUP_MODE_MARKDOWN and not first_line.startswith("\b"): | ||
first_line = first_line.replace("\n", " ") | ||
yield _make_rich_rext( | ||
text=first_line.strip(), | ||
style=STYLE_HELPTEXT_FIRST_LINE, | ||
markup_mode=markup_mode, | ||
) | ||
|
||
# Get remaining lines, remove single line breaks and format as dim | ||
remaining_paragraphs = help_text.split("\n\n")[1:] | ||
if remaining_paragraphs: | ||
if markup_mode != MARKUP_MODE_RICH: | ||
# Remove single linebreaks | ||
remaining_paragraphs = [ | ||
( | ||
x.replace("\n", " ").strip() | ||
if not x.startswith("\b") | ||
else "{}\n".format(x.strip("\b\n")) | ||
) | ||
for x in remaining_paragraphs | ||
] | ||
# Join back together | ||
remaining_lines = "\n".join(remaining_paragraphs) | ||
else: | ||
# Join with double linebreaks if markdown | ||
remaining_lines = "\n\n".join(remaining_paragraphs) | ||
yield _make_rich_rext( | ||
text="\n", | ||
style=STYLE_HELPTEXT, | ||
markup_mode=markup_mode, | ||
) | ||
yield _make_rich_rext( | ||
text=remaining_lines, | ||
style=STYLE_HELPTEXT, | ||
markup_mode=markup_mode, | ||
) | ||
|
||
with patcher("typer.rich_utils._get_help_text"): | ||
typer.rich_utils._get_help_text = _get_help_text # type: ignore | ||
|
||
|
||
def patch() -> None: | ||
"""Apply all patches.""" | ||
patch_help_text_style() | ||
patch_help_text_spacing() |