1
1
import copy
2
2
import importlib .util
3
3
import inspect
4
+ import json
4
5
import os
5
6
import sys
6
7
import tempfile
7
8
from importlib .metadata import PackageNotFoundError , version
8
9
from pathlib import Path
9
- from typing import Any , Callable , Optional
10
+ from types import ModuleType
11
+ from typing import Any , Callable , Optional , Union , get_args , get_origin , get_type_hints
10
12
11
13
import click
12
14
import inquirer
@@ -27,7 +29,7 @@ class PluginError(Exception):
27
29
28
30
29
31
hook_registry : set [Callable [..., Any ]] = set ()
30
- imported_modules = {}
32
+ imported_modules : dict [ str , ModuleType ] = {}
31
33
32
34
33
35
@click .group (name = "plugin" )
@@ -97,10 +99,11 @@ def toggle(plugin: str):
97
99
98
100
99
101
@plugin .command ()
100
- @click .argument ("plugin" , type = str )
101
- @click .argument ("function" , type = str )
102
- @click .argument ("args" , nargs = - 1 , type = str ) # Accepts zero or more arguments
103
- def run (plugin : str , function : str , args : tuple [str , ...]):
102
+ @click .argument ("plugin" , type = str , default = "" )
103
+ @click .argument ("function" , type = str , default = "" )
104
+ @click .option ("--args" , default = "" , type = str , help = "Apply positional arguments to the function" )
105
+ @click .option ("--json-dict" , default = "" , type = str , help = "Use json dict to populate parameters" )
106
+ def run (plugin : str , function : str , args : tuple [str , ...], json_dict : str ):
104
107
"""Run a command available in a plugin"""
105
108
plugin_dir = _get_plugin_directory ()
106
109
plugins = get_plugins_with_status (plugin_dir )
@@ -110,16 +113,106 @@ def run(plugin: str, function: str, args: tuple[str, ...]):
110
113
click .secho ("Please toggle it on to run commands." )
111
114
sys .exit (0 )
112
115
113
- module = imported_modules .get (f"plugins.{ plugin } " )
114
- if hasattr (module , function ):
115
- func = getattr (module , function )
116
+ if plugin == "" :
117
+ plugin_names = [
118
+ plugin_name .stem for plugin_name , status in get_plugins_with_status () if status
119
+ ]
120
+
121
+ q = [inquirer .List (name = "plugin" , message = "Please choose a plugin" , choices = plugin_names )]
122
+ plugin = inquirer .prompt (q , theme = GreenPassion ()).get ("plugin" )
123
+
124
+ if function == "" :
125
+ module = imported_modules .get (f"plugins.{ plugin } " )
126
+ funcs = [name for name , _func in inspect .getmembers (module , inspect .isfunction )]
127
+ q = [inquirer .List (name = "func" , message = "Please choose a function" , choices = funcs )]
128
+ function = inquirer .prompt (q , theme = GreenPassion ()).get ("func" )
129
+
130
+ func = get_func (function_name = function , plugin_name = plugin )
131
+ hints = get_type_hints (func )
132
+ if not func :
133
+ sys .exit (0 )
134
+
135
+ if args :
136
+ func (* args )
137
+ sys .exit (0 )
138
+
139
+ if not json_dict :
140
+ params = {}
141
+ sig = inspect .signature (func )
142
+ for name , param in sig .parameters .items ():
143
+ hint = hints .get (name )
144
+ hint_name = get_type_name (hint )
145
+ if param .default != inspect .Parameter .empty :
146
+ q = [
147
+ inquirer .Text (
148
+ "input" ,
149
+ message = f"Enter a value for '{ name } ' ({ hint_name } )" ,
150
+ default = param .default ,
151
+ )
152
+ ]
153
+ else :
154
+ q = [
155
+ inquirer .Text (
156
+ "input" ,
157
+ message = f"Enter a value for '{ name } ' ({ hint_name } )" ,
158
+ )
159
+ ]
160
+ user_input = inquirer .prompt (q ).get ("input" )
161
+ params [name ] = cast_to_hint (user_input , hint )
162
+ click .secho (
163
+ f"\n warnet plugin run { plugin } { function } --json-dict '{ json .dumps (params )} '\n " ,
164
+ fg = "green" ,
165
+ )
166
+ else :
167
+ params = json .loads (json_dict )
168
+
169
+ func (** params )
170
+
171
+
172
+ def cast_to_hint (value : str , hint : Any ) -> Any :
173
+ """
174
+ Cast a string value to the provided type hint.
175
+ """
176
+ origin = get_origin (hint )
177
+ args = get_args (hint )
178
+
179
+ # Handle basic types (int, str, float, etc.)
180
+ if origin is None :
181
+ return hint (value )
182
+
183
+ # Handle Union (e.g., Union[int, str])
184
+ if origin is Union :
185
+ for arg in args :
186
+ try :
187
+ return cast_to_hint (value , arg )
188
+ except (ValueError , TypeError ):
189
+ continue
190
+ raise ValueError (f"Cannot cast { value } to { hint } " )
191
+
192
+ # Handle Lists (e.g., List[int])
193
+ if origin is list :
194
+ return [cast_to_hint (v .strip (), args [0 ]) for v in value .split ("," )]
195
+
196
+ raise ValueError (f"Unsupported hint: { hint } " )
197
+
198
+
199
+ def get_type_name (type_hint ) -> str :
200
+ if hasattr (type_hint , "__name__" ):
201
+ return type_hint .__name__
202
+ return str (type_hint )
203
+
204
+
205
+ def get_func (function_name : str , plugin_name : str ) -> Optional [Callable [..., Any ]]:
206
+ module = imported_modules .get (f"plugins.{ plugin_name } " )
207
+ if hasattr (module , function_name ):
208
+ func = getattr (module , function_name )
116
209
if callable (func ):
117
- result = func (* args )
118
- print (result )
210
+ return func
119
211
else :
120
- click .secho (f"{ function } in { module } is not callable." )
212
+ click .secho (f"{ function_name } in { module } is not callable." )
121
213
else :
122
- click .secho (f"Could not find { function } in { module } " )
214
+ click .secho (f"Could not find { function_name } in { module } " )
215
+ return None
123
216
124
217
125
218
def api (func : Callable [..., Any ]) -> Callable [..., Any ]:
@@ -322,7 +415,9 @@ def check_if_plugin_enabled(path: Path) -> bool:
322
415
return bool (enabled )
323
416
324
417
325
- def get_plugins_with_status (plugin_dir : Path ) -> list [tuple [Path , bool ]]:
418
+ def get_plugins_with_status (plugin_dir : Optional [Path ] = None ) -> list [tuple [Path , bool ]]:
419
+ if not plugin_dir :
420
+ plugin_dir = _get_plugin_directory ()
326
421
candidates = [
327
422
Path (os .path .join (plugin_dir , name ))
328
423
for name in os .listdir (plugin_dir )
0 commit comments