8
8
"""
9
9
10
10
import logging
11
+ import time
11
12
from typing import Dict , Optional
12
-
13
13
from httpx import AsyncClient
14
14
from pydantic import BaseModel , Field
15
15
16
16
logger = logging .getLogger (__name__ )
17
17
logger .setLevel (logging .INFO )
18
18
19
+ TRANSLATIONS = {
20
+ "en" : {
21
+ "request_failed" : "Request failed: {error_msg}" ,
22
+ "insufficient_balance" : "Insufficient balance: Current balance `{balance:.4f}`" ,
23
+ "cost" : "Cost: ${cost:.4f}" ,
24
+ "balance" : "Balance: ${balance:.4f}" ,
25
+ "tokens" : "Tokens: {input}+{output}" ,
26
+ "time_spent" : "Time: {time:.2f}s" ,
27
+ "tokens_per_sec" : "{tokens_per_sec:.2f} T/s"
28
+ },
29
+ "zh" : {
30
+ "request_failed" : "请求失败: {error_msg}" ,
31
+ "insufficient_balance" : "余额不足: 当前余额 `{balance:.4f}`" ,
32
+ "cost" : "费用: ¥{cost:.4f}" ,
33
+ "balance" : "余额: ¥{balance:.4f}" ,
34
+ "tokens" : "Token: {input}+{output}" ,
35
+ "time_spent" : "耗时: {time:.2f}s" ,
36
+ "tokens_per_sec" : "{tokens_per_sec:.2f} T/s"
37
+ }
38
+ }
19
39
20
40
class CustomException (Exception ):
21
41
pass
22
42
23
-
24
43
class Filter :
25
44
class Valves (BaseModel ):
26
45
api_endpoint : str = Field (default = "" , description = "openwebui-monitor's base url" )
27
46
api_key : str = Field (default = "" , description = "openwebui-monitor's api key" )
28
47
priority : int = Field (default = 5 , description = "filter priority" )
48
+ language : str = Field (default = "zh" , description = "language (en/zh)" )
49
+ show_time_spent : bool = Field (default = True , description = "show time spent" )
50
+ show_tokens_per_sec : bool = Field (default = True , description = "show tokens per second" )
51
+ show_cost : bool = Field (default = True , description = "show cost" )
52
+ show_balance : bool = Field (default = True , description = "show balance" )
53
+ show_tokens : bool = Field (default = True , description = "show tokens" )
29
54
30
55
def __init__ (self ):
31
56
self .type = "filter"
57
+ self .name = "OpenWebUI Monitor"
32
58
self .valves = self .Valves ()
33
59
self .outage_map : Dict [str , bool ] = {}
60
+ self .start_time : Optional [float ] = None
61
+
62
+ def get_text (self , key : str , ** kwargs ) -> str :
63
+ lang = self .valves .language if self .valves .language in TRANSLATIONS else "en"
64
+ text = TRANSLATIONS [lang ].get (key , TRANSLATIONS ["en" ][key ])
65
+ return text .format (** kwargs ) if kwargs else text
34
66
35
67
async def request (self , client : AsyncClient , url : str , headers : dict , json : dict ):
36
68
response = await client .post (url = url , headers = headers , json = json )
37
69
response .raise_for_status ()
38
70
response_data = response .json ()
39
71
if not response_data .get ("success" ):
40
- logger .error ("[usage_monitor] req monitor failed: %s " , response_data )
41
- raise CustomException ("calculate usage failed, please contact administrator" )
72
+ logger .error (self . get_text ( "request_failed " , error_msg = response_data ) )
73
+ raise CustomException (self . get_text ( "request_failed" , error_msg = response_data ) )
42
74
return response_data
43
75
44
76
async def inlet (self , body : dict , __metadata__ : Optional [dict ] = None , __user__ : Optional [dict ] = None ) -> dict :
45
77
__user__ = __user__ or {}
46
78
__metadata__ = __metadata__ or {}
47
- user_id = __user__ ["id" ]
79
+ self .start_time = time .time ()
80
+ user_id = __user__ .get ("id" , "default" )
48
81
49
82
client = AsyncClient ()
50
-
51
83
try :
52
84
response_data = await self .request (
53
85
client = client ,
@@ -57,36 +89,26 @@ async def inlet(self, body: dict, __metadata__: Optional[dict] = None, __user__:
57
89
)
58
90
self .outage_map [user_id ] = response_data .get ("balance" , 0 ) <= 0
59
91
if self .outage_map [user_id ]:
60
- logger .info ("[usage_monitor] no balance: %s" , user_id )
61
- raise CustomException ("no balance, please contact administrator" )
62
-
92
+ logger .info (self .get_text ("insufficient_balance" , balance = response_data .get ("balance" , 0 )))
93
+ raise CustomException (self .get_text ("insufficient_balance" , balance = response_data .get ("balance" , 0 )))
63
94
return body
64
-
65
95
except Exception as err :
66
- logger .exception ("[usage_monitor] error calculating usage: %s " , err )
96
+ logger .exception (self . get_text ( "request_failed " , error_msg = err ) )
67
97
if isinstance (err , CustomException ):
68
98
raise err
69
99
raise Exception (f"error calculating usage, { err } " ) from err
70
-
71
100
finally :
72
101
await client .aclose ()
73
102
74
- async def outlet (
75
- self ,
76
- body : dict ,
77
- __metadata__ : Optional [dict ] = None ,
78
- __user__ : Optional [dict ] = None ,
79
- __event_emitter__ : callable = None ,
80
- ) -> dict :
103
+ async def outlet (self , body : dict , __metadata__ : Optional [dict ] = None , __user__ : Optional [dict ] = None , __event_emitter__ : Optional [callable ] = None ) -> dict :
81
104
__user__ = __user__ or {}
82
105
__metadata__ = __metadata__ or {}
83
- user_id = __user__ [ "id" ]
106
+ user_id = __user__ . get ( "id" , "default" )
84
107
85
- if self .outage_map [ user_id ] :
108
+ if self .outage_map . get ( user_id , False ) :
86
109
return body
87
110
88
111
client = AsyncClient ()
89
-
90
112
try :
91
113
response_data = await self .request (
92
114
client = client ,
@@ -95,23 +117,27 @@ async def outlet(
95
117
json = {"user" : __user__ , "body" : body },
96
118
)
97
119
98
- # pylint: disable=C0209
99
- stats = " | " .join (
100
- [
101
- f"Tokens: { response_data ['inputTokens' ]} + { response_data ['outputTokens' ]} " ,
102
- "Cost: %.4f" % response_data ["totalCost" ],
103
- "Balance: %.4f" % response_data ["newBalance" ],
104
- ]
105
- )
106
-
107
- await __event_emitter__ ({"type" : "status" , "data" : {"description" : stats , "done" : True }})
108
-
120
+ stats_list = []
121
+ if self .valves .show_tokens :
122
+ stats_list .append (self .get_text ("tokens" , input = response_data ['inputTokens' ], output = response_data ['outputTokens' ]))
123
+ if self .valves .show_cost :
124
+ stats_list .append (self .get_text ("cost" , cost = response_data ["totalCost" ]))
125
+ if self .valves .show_balance :
126
+ stats_list .append (self .get_text ("balance" , balance = response_data ["newBalance" ]))
127
+ if self .start_time and self .valves .show_time_spent :
128
+ elapsed = time .time () - self .start_time
129
+ stats_list .append (self .get_text ("time_spent" , time = elapsed ))
130
+ if self .valves .show_tokens_per_sec :
131
+ tokens_per_sec = response_data ['outputTokens' ] / elapsed if elapsed > 0 else 0
132
+ stats_list .append (self .get_text ("tokens_per_sec" , tokens_per_sec = tokens_per_sec ))
133
+
134
+ stats = " | " .join (stats_list )
135
+ if __event_emitter__ :
136
+ await __event_emitter__ ({"type" : "status" , "data" : {"description" : stats , "done" : True }})
109
137
logger .info ("usage_monitor: %s %s" , user_id , stats )
110
138
return body
111
-
112
139
except Exception as err :
113
- logger .exception ("[usage_monitor] error calculating usage: %s" , err )
114
- raise Exception (f"error calculating usage, { err } " ) from err
115
-
140
+ logger .exception (self .get_text ("request_failed" , error_msg = err ))
141
+ raise Exception (self .get_text ("request_failed" , error_msg = err ))
116
142
finally :
117
143
await client .aclose ()
0 commit comments