Skip to content

feat: Add new Prometheus target and metadata API endpoints #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions prometheus_api_client/prometheus_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,141 @@ def get_metric_aggregation(
else:
raise TypeError("Invalid operation: " + operation)
return aggregated_values


def get_scrape_pools(self) -> list[str]:
"""
Get a list of all scrape pools in activeTargets.
"""
scrape_pools = []
for target in self.get_targets()['activeTargets']:
scrape_pools.append(target['scrapePool'])
return list(set(scrape_pools))

def get_targets(self, state: str = None, scrape_pool: str = None):
"""
Get a list of all targets from Prometheus.

:param state: (str) Optional filter for target state ('active', 'dropped', 'any').
If None, returns both active and dropped targets.
:param scrape_pool: (str) Optional filter by scrape pool name
:returns: (dict) A dictionary containing active and dropped targets
:raises:
(RequestException) Raises an exception in case of a connection error
(PrometheusApiClientException) Raises in case of non 200 response status code
"""
params = {}
if state:
params['state'] = state
if scrape_pool:
params['scrapePool'] = scrape_pool

response = self._session.get(
"{0}/api/v1/targets".format(self.url),
verify=self._session.verify,
headers=self.headers,
params=params,
auth=self.auth,
cert=self._session.cert,
timeout=self._timeout,
)

if response.status_code == 200:
return response.json()["data"]
else:
raise PrometheusApiClientException(
"HTTP Status Code {} ({!r})".format(
response.status_code, response.content)
)

def get_target_metadata(self, target: dict[str, str], metric: str = None):
"""
Get metadata about metrics from a specific target.

:param target: (dict) A dictionary containing target labels to match against (e.g. {'job': 'prometheus'})
:param metric: (str) Optional metric name to filter metadata
:returns: (list) A list of metadata entries for matching targets
:raises:
(RequestException) Raises an exception in case of a connection error
(PrometheusApiClientException) Raises in case of non 200 response status code
"""
params = {}

# Convert target dict to label selector string
if metric:
params['metric'] = metric

if target:
match_target = "{" + \
",".join(f'{k}="{v}"' for k, v in target.items()) + "}"
params['match_target'] = match_target

response = self._session.get(
"{0}/api/v1/targets/metadata".format(self.url),
verify=self._session.verify,
headers=self.headers,
params=params,
auth=self.auth,
cert=self._session.cert,
timeout=self._timeout,
)

if response.status_code == 200:
return response.json()["data"]
else:
raise PrometheusApiClientException(
"HTTP Status Code {} ({!r})".format(
response.status_code, response.content)
)

def get_metric_metadata(self, metric: str, limit: int = None, limit_per_metric: int = None):
"""
Get metadata about metrics.

:param metric: (str) Optional metric name to filter metadata
:param limit: (int) Optional maximum number of metrics to return
:param limit_per_metric: (int) Optional maximum number of metadata entries per metric
:returns: (dict) A dictionary mapping metric names to lists of metadata entries in format:
{'metric_name': [{'type': str, 'help': str, 'unit': str}, ...]}
:raises:
(RequestException) Raises an exception in case of a connection error
(PrometheusApiClientException) Raises in case of non 200 response status code
"""
params = {}

if metric:
params['metric'] = metric

if limit:
params['limit'] = limit

if limit_per_metric:
params['limit_per_metric'] = limit_per_metric

response = self._session.get(
"{0}/api/v1/metadata".format(self.url),
verify=self._session.verify,
headers=self.headers,
params=params,
auth=self.auth,
cert=self._session.cert,
timeout=self._timeout,
)

if response.status_code == 200:
data = response.json()["data"]
formatted_data = []
for k, v in data.items():
for v_ in v:
formatted_data.append({
"metric_name": k,
"type": v_.get('type', 'unknown'),
"help": v_.get('help', ''),
"unit": v_.get('unit', '')
})
return formatted_data
else:
raise PrometheusApiClientException(
"HTTP Status Code {} ({!r})".format(
response.status_code, response.content)
)
73 changes: 72 additions & 1 deletion tests/test_prometheus_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ def test_get_metric_aggregation(self): # noqa D102
def test_get_metric_aggregation_with_incorrect_input_types(self): # noqa D102
with self.assertRaises(TypeError, msg="operations accepted invalid value type"):
_ = self.pc.get_metric_aggregation(query="up", operations="sum")

def test_retry_on_error(self): # noqa D102
retry = Retry(total=3, backoff_factor=0.1, status_forcelist=[400])
pc = PrometheusConnect(url=self.prometheus_host, disable_ssl=True, retry=retry)
Expand All @@ -140,6 +139,75 @@ def test_get_label_names_method(self): # noqa D102
self.assertEqual(len(labels), 4)
self.assertEqual(labels, ["__name__", "env", "instance", "job"])

def test_get_scrape_pools(self): # noqa D102
scrape_pools = self.pc.get_scrape_pools()
self.assertIsInstance(scrape_pools, list)
self.assertTrue(len(scrape_pools) > 0, "no scrape pools found")
self.assertIsInstance(scrape_pools[0], str)

def test_get_targets(self): # PR #295
targets = self.pc.get_targets()
self.assertIsInstance(targets, dict)
self.assertIn('activeTargets', targets)
self.assertIsInstance(targets['activeTargets'], list)

# Test with state filter
active_targets = self.pc.get_targets(state='active')
self.assertIsInstance(active_targets, dict)
self.assertIn('activeTargets', active_targets)

# Test with scrape_pool filter
if len(scrape_pools := self.pc.get_scrape_pools()) > 0:
pool_targets = self.pc.get_targets(scrape_pool=scrape_pools[0])
self.assertIsInstance(pool_targets, dict)

def test_get_target_metadata(self): # PR #295
# Get a target to test with
targets = self.pc.get_targets()
if len(targets['activeTargets']) > 0:
target = {
'job': targets['activeTargets'][0]['labels']['job']
}
metadata = self.pc.get_target_metadata(target)
self.assertIsInstance(metadata, list)

# Test with metric filter
if len(metadata) > 0:
metric_name = metadata[0]['metric']
filtered_metadata = self.pc.get_target_metadata(
target, metric=metric_name)
self.assertIsInstance(filtered_metadata, list)
self.assertTrue(
all(item['target']['job'] == target['job'] for item in filtered_metadata))


def test_get_metric_metadata(self): # PR #295
metadata = self.pc.get_metric_metadata(metric=None)
self.assertIsInstance(metadata, list)
self.assertTrue(len(metadata) > 0, "no metric metadata found")

# Check structure of metadata
self.assertIn('metric_name', metadata[0])
self.assertIn('type', metadata[0])
self.assertIn('help', metadata[0])
self.assertIn('unit', metadata[0])

# Test with specific metric
if len(metadata) > 0:
metric_name = metadata[0]['metric_name']
filtered_metadata = self.pc.get_metric_metadata(metric=metric_name)
self.assertIsInstance(filtered_metadata, list)
self.assertTrue(
all(item['metric_name'] == metric_name for item in filtered_metadata))

# Test with limit
limited_metadata = self.pc.get_metric_metadata(metric_name, limit=1)
self.assertLessEqual(len(limited_metadata), 1)

# Test with limit_per_metric
limited_per_metric = self.pc.get_metric_metadata(metric_name, limit_per_metric=1)
self.assertIsInstance(limited_per_metric, list)


class TestPrometheusConnectWithMockedNetwork(BaseMockedNetworkTestcase):
"""Network is blocked in this testcase, see base class."""
Expand Down Expand Up @@ -233,3 +301,6 @@ def test_get_label_values_method(self): # noqa D102
self.assertEqual(handler.call_count, 1)
request = handler.requests[0]
self.assertEqual(request.path_url, "/api/v1/label/label_name/values")

if __name__ == "__main__":
unittest.main()