-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcgroup.py
163 lines (135 loc) · 5.55 KB
/
cgroup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# (c) 2020–2021 Vladimír Štill <[email protected]>
from typing import Optional, List, Set, Generator
import os
import os.path
import contextlib
import sys
import signal
from limit import Limit
class CGException(Exception):
pass
class CGControl:
CPU_MAX_PERIOD = 10000 # 10ms
def __init__(self) -> None:
cgpath: Optional[str] = None
basepath: Optional[str] = None
try:
with open("/proc/self/cgroup") as cginfo:
for line in cginfo:
n0, _, path = line.split(":")
n = int(n0)
if n != 0:
raise CGException("cgroups v2 not fully available")
cgpath = path.strip()[1:]
except OSError:
raise CGException("Could not read cgroup info from /proc")
if cgpath is None:
raise CGException("Could not detect process' cgroup, maybe no "
"cgroup support enabled?")
try:
with open("/etc/mtab") as mtab:
for line in mtab:
typ, path, _ = line.split(" ", 2)
if typ == "cgroup2" and basepath is None:
basepath = path
except OSError:
raise CGException("Could not read /etc/mtab to detect cgroup root")
if basepath is None:
raise CGException("Could not found cgroup hierarchy, maybe it is "
"not mounted")
self.cg = os.path.join(basepath, cgpath)
def delegate(self, moveTo: str) -> None:
"""
Moves the current processes to {moveTo} and enables subtree_controll
"""
self.subdivide(moveTo)
self.register(self.procs(), moveTo)
self.enable_available_subtrees()
def procs(self, subpath: str = ".") -> Set[int]:
with open(os.path.join(self.cg, subpath, "cgroup.procs")) as prcs:
return {int(p) for p in prcs}
def controllers(self, subpath: str = ".") -> Set[str]:
with open(os.path.join(self.cg, subpath, "cgroup.controllers")) \
as ctrlsF:
return set(ctrlsF.read().strip().split(' '))
def register(self, pids: Set[int], subpath: str = ".") \
-> None:
with open(os.path.join(self.cg, subpath, "cgroup.procs"), "w") \
as tgtPrcs:
for pid in pids:
tgtPrcs.write(str(pid))
def register_me(self, subpath: str = ".") -> None:
self.register({os.getpid()}, subpath)
def _write(self, path: str, key: str, value: str) -> None:
with open(os.path.join(self.cg, path, key), "w") as dst:
dst.write(value)
def enable_subtrees(self, controllers: Set[str],
subpath: str = ".") -> None:
self._write(subpath, "cgroup.subtree_control",
f"+{' +'.join(controllers)}")
def enable_available_subtrees(self, subpath: str = ".") -> None:
self.enable_subtrees(self.controllers(subpath), subpath)
def subdivide(self, subpath: str) -> str:
try:
os.makedirs(os.path.join(self.cg, subpath), exist_ok=True)
except Exception as ex:
raise CGException(f"Error creating sub-hierarchy: {ex}")
return subpath
def set_limit(self, limit: Limit, subpath: str = ".") -> None:
def wr(key: str, val: str, name: str) -> None:
try:
self._write(subpath, key, val)
except OSError as ex:
print(f"W: cgroup error: could not set {name} limit, "
f"ignoring it; error {ex}", file=sys.stderr, flush=True)
for key, val, name in [("memory.max", limit.memory, "memory"),
("memory.swap.max", limit.swap, "swap")]:
wr(key, str(val) if val is not None else "max", name)
wr("cpu.max",
f"{round(limit.cpu * self.CPU_MAX_PERIOD)} {self.CPU_MAX_PERIOD}"
if limit.cpu is not None else "max",
"CPU")
class SlotManager:
def __init__(self, limit: Limit) -> None:
self.cg: Optional[CGControl] = None
self._available = False
try:
self.cg = CGControl()
self.cg.delegate("master")
self.slaves = "slaves"
self.cg.subdivide(self.slaves)
self.cg.enable_available_subtrees(self.slaves)
self._free_slots: List[str] = []
self._slot_cnt = 0
self.limit = limit
self._available = True
except CGException as ex:
print(f"W: cgroup error: {ex}", file=sys.stderr, flush=True)
def available(self) -> bool:
return self._available
def _mkslot(self) -> str:
assert self.cg is not None
if self._free_slots:
return self._free_slots.pop()
slot_id = self._slot_cnt
self._slot_cnt += 1
slot = self.cg.subdivide(f"{self.slaves}/slot{slot_id}")
self.cg.set_limit(self.limit, slot)
return slot
def _terminate(self, slot: str) -> None:
assert self.cg is not None
for p in self.cg.procs(slot):
print(f"W: killing stale {p}", file=sys.stderr, flush=True)
os.kill(p, signal.SIGKILL)
self._free_slots.append(slot)
@contextlib.contextmanager
def get(self) -> Generator[Optional[str], None, None]:
if not self._available:
yield None
else:
slot = self._mkslot()
try:
yield slot
finally:
self._terminate(slot)
# vim: colorcolumn=80 expandtab sw=4 ts=4