Skip to content

Commit

Permalink
Use influxdb-client v2 API
Browse files Browse the repository at this point in the history
  • Loading branch information
bnavigator committed Oct 2, 2024
1 parent 3974d45 commit c4b3b78
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 68 deletions.
144 changes: 87 additions & 57 deletions metrics.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
#!/usr/bin/python3

import argparse
from collections import namedtuple
from datetime import datetime
from dateutil.parser import parse as date_parse
from influxdb import InfluxDBClient
from lxml import etree as ET
import os
import subprocess
import sys
import yaml
from collections import namedtuple
from datetime import datetime

import metrics_release
import osc.conf
import osc.core
from osc.core import HTTPError
from osc.core import get_commitlog
import yaml
from dateutil.parser import parse as date_parse
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from lxml import etree as ET
from osc.core import HTTPError, get_commitlog

import metrics_release
import osclib.conf
from osclib.cache import Cache
from osclib.conf import Config
from osclib.core import get_request_list_with_history
from osclib.core import project_pseudometa_package
from osclib.core import (get_request_list_with_history,
project_pseudometa_package)
from osclib.stagingapi import StagingAPI

SOURCE_DIR = os.path.dirname(os.path.realpath(__file__))
Expand Down Expand Up @@ -102,7 +103,7 @@ def timestamp(datetime):
return int(datetime.strftime('%s'))


def ingest_requests(api, project):
def ingest_requests(client, api, project):
requests = get_request_list_with_history(
api.apiurl, project, req_state=('accepted', 'revoked', 'superseded'))
for request in requests:
Expand Down Expand Up @@ -249,7 +250,7 @@ def ingest_requests(api, project):
print(f"unable to find priority history entry for {request.get('id')} to {priority.text}")

print(f'finalizing {len(points):,} points')
return walk_points(points, project)
return walk_points(client, points, project)


def who_workaround(request, review, relax=False):
Expand Down Expand Up @@ -283,9 +284,9 @@ def who_workaround(request, review, relax=False):
# allocating memory for entire incoming data set at once.


def walk_points(points, target):
global client

def walk_points(client, points, target):
delete_api = client.delete_api()
write_api = client.write_api(write_options=SYNCHRONOUS)
measurements = set()
counters = {}
final = []
Expand All @@ -294,14 +295,17 @@ def walk_points(points, target):
for point in sorted(points, key=lambda p: p.time):
if point.measurement not in measurements:
# Wait until just before writing to drop measurement.
client.drop_measurement(point.measurement)
delete_api.delete(start="1970-01-01T00:00:00Z",
stop=datetime.utcnow().isoformat() + "Z",
bucket=target,
predicate=f'_measurement="{point.measurement}"')
measurements.add(point.measurement)

if point.time != time_last and len(final) >= 1000:
# Write final point in batches of ~1000, but guard against writing
# when in the middle of points at the same time as they may end up
# being merged. As such the previous time should not match current.
client.write_points(final, 's')
write_api.write(bucket=target, record=final, write_precision='s')
wrote += len(final)
final = []
time_last = point.time
Expand Down Expand Up @@ -333,11 +337,11 @@ def walk_points(points, target):
point['fields'].update(counters_tag['values'])

# Write any remaining final points.
client.write_points(final, 's')
write_api.write(bucket=target, record=final, write_precision='s')
return wrote + len(final)


def ingest_release_schedule(project):
def ingest_release_schedule(client, project):
points = []
release_schedule = {}
release_schedule_file = os.path.join(SOURCE_DIR, f'metrics/annotation/{project}.yaml')
Expand All @@ -363,8 +367,13 @@ def ingest_release_schedule(project):
'time': timestamp(date),
})

client.drop_measurement('release_schedule')
client.write_points(points, 's')
delete_api = client.delete_api()
delete_api.delete(start="1970-01-01T00:00:00Z",
stop=datetime.utcnow().isoformat() + "Z",
bucket=project,
predicate='_measurement="release_schedule"')
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api(bucket=project, record=points, write_precision='s')
return len(points)


Expand Down Expand Up @@ -439,10 +448,18 @@ def dashboard_at_changed(api, filename, revision=None):

def ingest_dashboard_config(content):
if not hasattr(ingest_dashboard_config, 'previous'):
result = client.query('SELECT * FROM dashboard_config ORDER BY time DESC LIMIT 1')
if result:
query_api = ingest_dashboard_config.client.query_api()
result = query_api.query(query=f'''
from(bucket: "{ingest_dashboard_config.project}")
|> range(start: -100y)
|> filter(fn: (r) => r._measurement == "dashboard_config")
|> filter(fn: (r) => r._field == "revision")
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
''')
if result and result[0].records:
# Extract last point and remove zero values since no need to fill.
point = next(result.get_points())
point = result[0].records[0].get_value()
point = {k: v for (k, v) in point.iteritems() if k != 'time' and v != 0}
ingest_dashboard_config.previous = set(point.keys())
else:
Expand Down Expand Up @@ -488,25 +505,34 @@ def ingest_dashboard_version_snapshot(content):
}


def ingest_dashboard_revision_get():
result = client.query('SELECT revision FROM dashboard_revision ORDER BY time DESC LIMIT 1')
if result:
return next(result.get_points())['revision']
def ingest_dashboard_revision_get(client, project):
query_api = client.query_api()
result = query_api.query(query=f'''
from(bucket: "{project}")
|> range(start: -100y)
|> filter(fn: (r) => r._measurement == "dashboard_revision")
|> filter(fn: (r) => r._field == "revision")
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)''')
if result and result[0].records:
return result[0].records[0].get_value()

return None


def ingest_dashboard(api):
def ingest_dashboard(client, project, api):
index = revision_index(api)

revision_last = ingest_dashboard_revision_get()
revision_last = ingest_dashboard_revision_get(client, project)
past = True if revision_last is None else False
print('dashboard ingest: processing {:,} revisions starting after {}'.format(
len(index), 'the beginning' if past else revision_last))

filenames = ['config', 'repo_checker', 'version_snapshot']
if api.project == 'openSUSE:Factory':
filenames.append('devel_projects')
ingest_dashboard_config.client = client
ingest_dashboard_config.project = api.project

count = 0
points = []
Expand Down Expand Up @@ -554,40 +580,44 @@ def ingest_dashboard(api):


def main(args):
global client
client = InfluxDBClient(args.host, args.port, args.user, args.password, args.project)
with InfluxDBClient(url=f"http://{args.host}:{args.port}",
username=args.user,
password=args.password,
org="openSUSE-release-tools-metrics") as client:

osc.conf.get_config(override_apiurl=args.apiurl)
apiurl = osc.conf.config['apiurl']
osc.conf.config['debug'] = args.debug
osc.conf.get_config(override_apiurl=args.apiurl)
apiurl = osc.conf.config['apiurl']
osc.conf.config['debug'] = args.debug

# Ensure database exists.
client.create_database(client._database)
# Ensure database exists.
buckets_api = client.buckets_api()
buckets_api.create_bucket(args.project)

metrics_release.ingest(client)
if args.release_only:
return

# Use separate cache since it is persistent.
_, package = project_pseudometa_package(apiurl, args.project)
if args.wipe_cache:
Cache.delete_all()
if args.heavy_cache:
Cache.PATTERNS[r'/search/request'] = sys.maxsize
Cache.PATTERNS[r'/source/[^/]+/{}/_history'.format(package)] = sys.maxsize
Cache.PATTERNS[r'/source/[^/]+/{}/[^/]+\?rev=.*'.format(package)] = sys.maxsize
Cache.init('metrics')
metrics_release.ingest(client, bucketname=args.project)
if args.release_only:
return

Config(apiurl, args.project)
api = StagingAPI(apiurl, args.project)
# Use separate cache since it is persistent.
_, package = project_pseudometa_package(apiurl, args.project)
if args.wipe_cache:
Cache.delete_all()
if args.heavy_cache:
Cache.PATTERNS[r'/search/request'] = sys.maxsize
Cache.PATTERNS[r'/source/[^/]+/{}/_history'.format(package)] = sys.maxsize
Cache.PATTERNS[r'/source/[^/]+/{}/[^/]+\?rev=.*'.format(package)] = sys.maxsize
Cache.init('metrics')

print(f'dashboard: wrote {ingest_dashboard(api):,} points')
Config(apiurl, args.project)
api = StagingAPI(apiurl, args.project)

global who_workaround_swap, who_workaround_miss
who_workaround_swap = who_workaround_miss = 0
print(f'dashboard: wrote {ingest_dashboard(client, args.project, api):,} points')

global who_workaround_swap, who_workaround_miss
who_workaround_swap = who_workaround_miss = 0

points_requests = ingest_requests(api, args.project)
points_schedule = ingest_release_schedule(args.project)
points_requests = ingest_requests(client, api, args.project)
points_schedule = ingest_release_schedule(client, args.project)

print('who_workaround_swap', who_workaround_swap)
print('who_workaround_miss', who_workaround_miss)
Expand Down
34 changes: 23 additions & 11 deletions metrics_release.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from dateutil.parser import parse as date_parse
from metrics import timestamp
import requests
from urllib.parse import urljoin
from datetime import datetime

import requests
import yaml
from dateutil.parser import parse as date_parse
from influxdb_client.client.write_api import SYNCHRONOUS

from metrics import timestamp

BASEURL = 'http://review.tumbleweed.boombatower.com/data/'

Expand All @@ -13,12 +16,21 @@ def data_load(name):
return yaml.safe_load(response.text)


def data_write(client, measurement, points):
client.drop_measurement(measurement)
client.write_points(points, 's')
def data_drop(client, bucketname, measurement):
start = "1970-01-01T00:00:00Z"
stop = datetime.utcnow().isoformat() + "Z"
delete_api = client.delete_api()
delete_api.delete(start=start, stop=stop, bucket=bucketname,
predicate=f'_measurement="{measurement}"')


def data_write(client, bucketname, measurement, points):
data_drop(client, bucketname, measurement)
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket=bucketname, record=points, write_precision='s')


def ingest_data(client, name):
def ingest_data(client, bucketname, name):
data = data_load(name)

measurement = f'release_{name}'
Expand All @@ -31,7 +43,7 @@ def ingest_data(client, name):
'time': timestamp(date_parse(release)),
})

data_write(client, measurement, points)
data_write(client, bucketname, measurement, points)
print(f'wrote {len(points)} for {name}')


Expand Down Expand Up @@ -59,10 +71,10 @@ def map_snapshot(details):
}


def ingest(client):
if client._database != 'openSUSE:Factory':
def ingest(client, bucketname):
if bucketname != 'openSUSE:Factory':
print('skipping release ingest for unsupported project')
return

for name in ['bug', 'mail', 'score', 'snapshot']:
ingest_data(client, name)
ingest_data(client, bucketname, name)

0 comments on commit c4b3b78

Please sign in to comment.