-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathLoggingStuff.py
More file actions
157 lines (131 loc) · 5.07 KB
/
LoggingStuff.py
File metadata and controls
157 lines (131 loc) · 5.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import asyncio
from datetime import datetime
import logging
from logging import FileHandler, LogRecord, StreamHandler
import os
from queue import Queue
from threading import Thread
import json
from discord_webhook import DiscordWebhook, AsyncDiscordWebhook
CODENAME = "pate-trigger-sv-mgr"
with open("secrets.json", "r") as f:
secrets = json.load(f)
DISCORD_WEBHOOK_URL = secrets["DISCORD_WEBHOOK_URL"]
ROLE_TO_PING_FOR_ATTENTION = secrets["ROLE_TO_PING_FOR_ATTENTION"]
EMOJIS = {
logging.DEBUG: "🔌",
logging.INFO: "ℹ️",
logging.WARNING: "⚠️",
logging.ERROR: "💢",
logging.CRITICAL: "🆘",
}
MESSAGE_LENGTH_LIMIT_INDEX = 1997
MESSAGE_LIMIT_SPLIT_INDICATOR = "⤵️"
MESSAGE_LIMIT_CONTINUE_INDICATOR = "↪️"
# https://github.com/CopterExpress/python-async-logging-handler/blob/master/async_logging_handler/__init__.py
# https://stackoverflow.com/questions/75090778/making-a-logging-handler-with-async-emit
class DiscordWebhookHandler(logging.Handler):
def __init__(
self,
discord_webhook_url: str,
level=0,
) -> None:
super().__init__(level)
self.discord_webhook_url = discord_webhook_url
self.queue = Queue()
self.thread = Thread(target=self.queue_consumer)
self.thread.daemon = True
self.thread.start()
def emit(self, record: LogRecord) -> None:
self.queue.put(record)
def queue_consumer(self):
while True:
record = self.queue.get()
try:
self.webhook_emit(self.format(record))
except Exception as e:
print(e)
self.queue.task_done()
def webhook_emit(self, message):
for split_message in discord_message_limit_iter(message):
DiscordWebhook(
url=self.discord_webhook_url,
rate_limit_retry=True,
content=split_message,
).execute()
def format(self, record: LogRecord) -> str:
return f"{EMOJIS[record.levelno]} {super().format(record)} {ROLE_TO_PING_FOR_ATTENTION_str if record.levelno >= logging.WARNING else ''}"
def close(self) -> None:
logger.debug("Waiting for all logs to be sent to discord webhook")
self.queue.join()
super().close()
def discord_message_limit_iter(message: str):
is_first_message = True
left_index = 0
right_index = MESSAGE_LENGTH_LIMIT_INDEX
def indicate_message(message: str, is_first_message: bool, more_messages: bool):
return f"{MESSAGE_LIMIT_CONTINUE_INDICATOR if not is_first_message else ''}{message}{MESSAGE_LIMIT_SPLIT_INDICATOR if more_messages else ''}"
while right_index < len(message):
newline_index = message.rfind("\n", left_index, right_index)
# a single line of string
if newline_index == -1:
yield indicate_message(
message[left_index:right_index],
is_first_message,
right_index < len(message),
)
left_index = right_index + 1
right_index = left_index + MESSAGE_LENGTH_LIMIT_INDEX
else:
yield indicate_message(
message[left_index : newline_index - 1],
is_first_message,
newline_index < len(message),
)
left_index = newline_index + 1
right_index = left_index + MESSAGE_LENGTH_LIMIT_INDEX
is_first_message = False
# whatever remaining
yield indicate_message(message[left_index:], is_first_message, False)
def discord_ping_role(role_id: int):
return f"<@&{role_id}>"
async def webhook_file_upload(files: list[str]):
webhook = AsyncDiscordWebhook(
url=DISCORD_WEBHOOK_URL,
rate_limit_retry=True,
content=datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f"),
)
for file in files:
try:
with open(file, "rb") as f:
logger.debug("Loading file %s for webhook upload", file)
webhook.add_file(f.read(), os.path.basename(file))
except FileNotFoundError:
logger.exception("File upload error, path %s", file)
await webhook.execute()
logger.debug("Attempted to upload %d files to webhook", len(files))
ROLE_TO_PING_FOR_ATTENTION_str = discord_ping_role(ROLE_TO_PING_FOR_ATTENTION)
logger = logging.getLogger(CODENAME)
def setup_logging():
verbose_formatter = logging.Formatter(
"%(asctime)s,%(msecs)03d %(levelname).1s %(message)s", "%H:%M:%S"
)
file_formatter = logging.Formatter("%(asctime)s %(levelname).1s %(message)s")
logger.setLevel(logging.DEBUG)
sh = StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(verbose_formatter)
logger.addHandler(sh)
fh = FileHandler(f"{CODENAME}.log")
fh.setLevel(logging.DEBUG)
fh.setFormatter(file_formatter)
logger.addHandler(fh)
dwh = DiscordWebhookHandler(DISCORD_WEBHOOK_URL)
dwh.setLevel(logging.INFO)
dwh.setFormatter(verbose_formatter)
logger.addHandler(dwh)
async def main():
setup_logging()
logger.info("ME MAY SIEU BEO")
if __name__ == "__main__":
asyncio.run(main())