-
Notifications
You must be signed in to change notification settings - Fork 539
/
Copy pathkernel.py
128 lines (102 loc) · 3.81 KB
/
kernel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from dotenv import load_dotenv
load_dotenv() # take environment variables from .env.
import asyncio
import subprocess
import platform
import os
import shutil
from .logs import setup_logging
from .logs import logger
setup_logging()
# dmesg process created at boot time
dmesg_proc = None
def get_kernel_messages():
"""
Is this the way to do this?
"""
current_platform = platform.system()
if current_platform == "Darwin":
process = subprocess.Popen(
["syslog"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
)
output, _ = process.communicate()
return output.decode("utf-8")
elif current_platform == "Linux":
log_path = get_dmesg_log_path()
with open(log_path, 'r') as file:
return file.read()
else:
logger.info("Unsupported platform.")
def get_dmesg_log_path():
"""
Check for the existence of a readable dmesg log file and return its path.
Create an accessible path if not found.
"""
if os.access('/var/log/dmesg', os.F_OK | os.R_OK):
return '/var/log/dmesg'
global dmesg_proc
dmesg_log_path = '/tmp/dmesg'
if dmesg_proc:
return dmesg_log_path
logger.info("Created /tmp/dmesg.")
subprocess.run(['touch', dmesg_log_path])
dmesg_path = shutil.which('dmesg')
if dmesg_path:
logger.info(f"Writing to {dmesg_log_path} from dmesg.")
dmesg_proc = subprocess.Popen([dmesg_path, '--follow'], text=True, stdout=subprocess.PIPE)
subprocess.Popen(['tee', dmesg_log_path], text=True, stdin=dmesg_proc.stdout, stdout=subprocess.DEVNULL)
return dmesg_log_path
def custom_filter(message):
# Check for {TO_INTERPRETER{ message here }TO_INTERPRETER} pattern
if "{TO_INTERPRETER{" in message and "}TO_INTERPRETER}" in message:
start = message.find("{TO_INTERPRETER{") + len("{TO_INTERPRETER{")
end = message.find("}TO_INTERPRETER}", start)
return message[start:end]
# Check for USB mention
# elif 'USB' in message:
# return message
# # Check for network related keywords
# elif any(keyword in message for keyword in ['network', 'IP', 'internet', 'LAN', 'WAN', 'router', 'switch']) and "networkStatusForFlags" not in message:
# return message
else:
return None
last_messages = ""
def check_filtered_kernel():
messages = get_kernel_messages()
if messages is None:
return "" # Handle unsupported platform or error in fetching kernel messages
global last_messages
messages.replace(last_messages, "")
messages = messages.split("\n")
filtered_messages = []
for message in messages:
if custom_filter(message):
filtered_messages.append(message)
return "\n".join(filtered_messages)
async def put_kernel_messages_into_queue(queue):
while True:
text = check_filtered_kernel()
if text:
if isinstance(queue, asyncio.Queue):
await queue.put({"role": "computer", "type": "console", "start": True})
await queue.put(
{
"role": "computer",
"type": "console",
"format": "output",
"content": text,
}
)
await queue.put({"role": "computer", "type": "console", "end": True})
else:
queue.put({"role": "computer", "type": "console", "start": True})
queue.put(
{
"role": "computer",
"type": "console",
"format": "output",
"content": text,
}
)
queue.put({"role": "computer", "type": "console", "end": True})
await asyncio.sleep(5)