-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetric_collector.py
113 lines (100 loc) · 5.07 KB
/
metric_collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# metric_collector.py
import importlib.util
import os
import time
import logging
logger = logging.getLogger(__name__)
class MetricCollector:
def __init__(self, config_manager):
self.config_manager = config_manager
self.metrics_modules = {}
self.last_collection_times = {}
self.load_metric_modules()
def load_metric_modules(self):
for filename in os.listdir(self.config_manager.metrics_dir):
if filename.endswith('.py'):
module_name = filename[:-3]
module_path = os.path.join(self.config_manager.metrics_dir, filename)
try:
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, 'collect'):
self.metrics_modules[module_name] = module
logger.info(f"Loaded metric module: {module_name}")
else:
logger.warning(f"Metric module {module_name} does not have a 'collect' function")
except Exception as e:
logger.error(f"Error loading metric module {module_name}: {str(e)}")
def reload_active_metrics(self):
self.config_manager.active_metrics = [
metric for metric in self.config_manager.active_metrics
if metric in self.metrics_modules
]
logger.info(f"Active metrics reloaded: {self.config_manager.active_metrics}")
def get_metric_interval(self, metric_name):
return self.config_manager.metric_intervals.get(metric_name, self.config_manager.default_interval)
def collect_metrics(self):
current_time = time.time()
collected_metrics = {}
for name in self.config_manager.active_metrics:
if name in self.metrics_modules:
module = self.metrics_modules[name]
interval = self.get_metric_interval(name)
last_collection = self.last_collection_times.get(name, 0)
if current_time - last_collection >= interval:
try:
value = module.collect()
collected_metrics[name] = {'value': value, 'timestamp': current_time}
self.last_collection_times[name] = current_time
logger.info(f"Collected metric {name}: {value}")
except Exception as e:
logger.error(f"Error collecting metric {name}: {e}", exc_info=True)
return collected_metrics
def get_shortest_interval(self):
if not self.config_manager.active_metrics:
return self.config_manager.default_interval
return min(self.get_metric_interval(metric) for metric in self.config_manager.active_metrics)
def update_metric_script(self, metric_name, metric_code):
file_path = os.path.join(self.config_manager.metrics_dir, f"{metric_name}.py")
try:
with open(file_path, 'w') as f:
f.write(metric_code)
logger.info(f"Updated metric script: {metric_name}")
self.reload_metric_module(metric_name)
except Exception as e:
logger.error(f"Error updating metric script {metric_name}: {str(e)}")
def reload_metric_module(self, metric_name):
file_path = os.path.join(self.config_manager.metrics_dir, f"{metric_name}.py")
try:
spec = importlib.util.spec_from_file_location(metric_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, 'collect'):
self.metrics_modules[metric_name] = module
logger.info(f"Reloaded metric module: {metric_name}")
else:
logger.warning(f"Reloaded metric module {metric_name} does not have a 'collect' function")
except Exception as e:
logger.error(f"Error reloading metric module {metric_name}: {str(e)}")
def remove_metric_script(self, metric_name):
file_path = os.path.join(self.config_manager.metrics_dir, f"{metric_name}.py")
try:
os.remove(file_path)
self.metrics_modules.pop(metric_name, None)
self.last_collection_times.pop(metric_name, None)
logger.info(f"Removed metric script: {metric_name}")
except Exception as e:
logger.error(f"Error removing metric script {metric_name}: {str(e)}")
def list_available_metrics(self):
return list(self.metrics_modules.keys())
def get_metric_info(self, metric_name):
if metric_name in self.metrics_modules:
module = self.metrics_modules[metric_name]
return {
"name": metric_name,
"interval": self.get_metric_interval(metric_name),
"last_collection": self.last_collection_times.get(metric_name, 0),
"description": getattr(module, 'description', 'No description available')
}
return None