Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move metrics to InfluxDB v2 python client #3167

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:

- name: FIXME, install missing dependencies
run: |
zypper -n in python3-typing_extensions python3-solv python3-pika python3-openqa_client build python3-influxdb python3-bugzilla
zypper -n in python3-typing_extensions python3-solv python3-pika python3-openqa_client build python3-influxdb-client python3-bugzilla

- name: run a simple smoke test whether --help actually works
run: |
Expand Down
2 changes: 1 addition & 1 deletion dist/ci/testenv-tumbleweed/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN zypper --gpg-auto-import-keys ref

RUN zypper in -y osc python3-pytest python3-httpretty python3-pyxdg python3-PyYAML \
python3-pika python3-cmdln python3-lxml python3-python-dateutil python3-colorama \
python3-influxdb python3-pytest-cov libxml2-tools curl python3-flake8 python3-requests \
python3-influxdb-client python3-pytest-cov libxml2-tools curl python3-flake8 python3-requests \
shadow vim vim-data strace git sudo patch unzip which cpio gawk openSUSE-release openSUSE-release-ftp \
perl-Net-SSLeay perl-Text-Diff perl-XML-Simple perl-XML-Parser build \
obs-service-download_files obs-service-format_spec_file obs-scm-bridge python3-GitPython
Expand Down
4 changes: 2 additions & 2 deletions dist/package/openSUSE-release-tools.spec
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ Requires(pre): shadow
Suggests: grafana
BuildArch: noarch
%if 0%{?suse_version} > 1500
Requires: influxdb
Requires: python3-influxdb
Requires: influxdb2
Requires: python3-influxdb-client
Requires: telegraf
%else
Suggests: influxdb
Expand Down
159 changes: 94 additions & 65 deletions metrics.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
#!/usr/bin/python3

import argparse
from collections import namedtuple
from datetime import datetime
from dateutil.parser import parse as date_parse
from influxdb import InfluxDBClient
from lxml import etree as ET
import os
import subprocess
import sys
import yaml
from collections import namedtuple
from datetime import datetime

import metrics_release
import osc.conf
import osc.core
from osc.core import HTTPError
from osc.core import get_commitlog
import yaml
from dateutil.parser import parse as date_parse
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from lxml import etree as ET
from osc.core import HTTPError, get_commitlog

import metrics_release
import osclib.conf
from osclib.cache import Cache
from osclib.conf import Config
from osclib.core import get_request_list_with_history
from osclib.core import project_pseudometa_package
from osclib.core import (get_request_list_with_history,
project_pseudometa_package)
from osclib.stagingapi import StagingAPI

SOURCE_DIR = os.path.dirname(os.path.realpath(__file__))
Expand Down Expand Up @@ -102,7 +103,7 @@ def timestamp(datetime):
return int(datetime.strftime('%s'))


def ingest_requests(api, project):
def ingest_requests(client, api, project):
requests = get_request_list_with_history(
api.apiurl, project, req_state=('accepted', 'revoked', 'superseded'))
for request in requests:
Expand Down Expand Up @@ -249,7 +250,7 @@ def ingest_requests(api, project):
print(f"unable to find priority history entry for {request.get('id')} to {priority.text}")

print(f'finalizing {len(points):,} points')
return walk_points(points, project)
return walk_points(client, points, project)


def who_workaround(request, review, relax=False):
Expand Down Expand Up @@ -283,9 +284,9 @@ def who_workaround(request, review, relax=False):
# allocating memory for entire incoming data set at once.


def walk_points(points, target):
global client

def walk_points(client, points, target):
delete_api = client.delete_api()
write_api = client.write_api(write_options=SYNCHRONOUS)
measurements = set()
counters = {}
final = []
Expand All @@ -294,14 +295,17 @@ def walk_points(points, target):
for point in sorted(points, key=lambda p: p.time):
if point.measurement not in measurements:
# Wait until just before writing to drop measurement.
client.drop_measurement(point.measurement)
delete_api.delete(start="1970-01-01T00:00:00Z",
stop=datetime.utcnow().isoformat() + "Z",
bucket=target,
predicate=f'_measurement="{point.measurement}"')
measurements.add(point.measurement)

if point.time != time_last and len(final) >= 1000:
# Write final point in batches of ~1000, but guard against writing
# when in the middle of points at the same time as they may end up
# being merged. As such the previous time should not match current.
client.write_points(final, 's')
write_api.write(bucket=target, record=final, write_precision='s')
wrote += len(final)
final = []
time_last = point.time
Expand Down Expand Up @@ -333,11 +337,11 @@ def walk_points(points, target):
point['fields'].update(counters_tag['values'])

# Write any remaining final points.
client.write_points(final, 's')
write_api.write(bucket=target, record=final, write_precision='s')
return wrote + len(final)


def ingest_release_schedule(project):
def ingest_release_schedule(client, project):
points = []
release_schedule = {}
release_schedule_file = os.path.join(SOURCE_DIR, f'metrics/annotation/{project}.yaml')
Expand All @@ -363,8 +367,13 @@ def ingest_release_schedule(project):
'time': timestamp(date),
})

client.drop_measurement('release_schedule')
client.write_points(points, 's')
delete_api = client.delete_api()
delete_api.delete(start="1970-01-01T00:00:00Z",
stop=datetime.utcnow().isoformat() + "Z",
bucket=project,
predicate='_measurement="release_schedule"')
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api(bucket=project, record=points, write_precision='s')
return len(points)


Expand Down Expand Up @@ -439,10 +448,18 @@ def dashboard_at_changed(api, filename, revision=None):

def ingest_dashboard_config(content):
if not hasattr(ingest_dashboard_config, 'previous'):
result = client.query('SELECT * FROM dashboard_config ORDER BY time DESC LIMIT 1')
if result:
query_api = ingest_dashboard_config.client.query_api()
result = query_api.query(query=f'''
from(bucket: "{ingest_dashboard_config.project}")
|> range(start: -100y)
|> filter(fn: (r) => r._measurement == "dashboard_config")
|> filter(fn: (r) => r._field == "revision")
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
''')
if result and result[0].records:
# Extract last point and remove zero values since no need to fill.
point = next(result.get_points())
point = result[0].records[0].get_value()
point = {k: v for (k, v) in point.iteritems() if k != 'time' and v != 0}
ingest_dashboard_config.previous = set(point.keys())
else:
Expand Down Expand Up @@ -488,25 +505,34 @@ def ingest_dashboard_version_snapshot(content):
}


def ingest_dashboard_revision_get():
result = client.query('SELECT revision FROM dashboard_revision ORDER BY time DESC LIMIT 1')
if result:
return next(result.get_points())['revision']
def ingest_dashboard_revision_get(client, project):
query_api = client.query_api()
result = query_api.query(query=f'''
from(bucket: "{project}")
|> range(start: -100y)
|> filter(fn: (r) => r._measurement == "dashboard_revision")
|> filter(fn: (r) => r._field == "revision")
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)''')
if result and result[0].records:
return result[0].records[0].get_value()

return None


def ingest_dashboard(api):
def ingest_dashboard(client, project, api):
index = revision_index(api)

revision_last = ingest_dashboard_revision_get()
revision_last = ingest_dashboard_revision_get(client, project)
past = True if revision_last is None else False
print('dashboard ingest: processing {:,} revisions starting after {}'.format(
len(index), 'the beginning' if past else revision_last))

filenames = ['config', 'repo_checker', 'version_snapshot']
if api.project == 'openSUSE:Factory':
filenames.append('devel_projects')
ingest_dashboard_config.client = client
ingest_dashboard_config.project = api.project

count = 0
points = []
Expand Down Expand Up @@ -554,40 +580,43 @@ def ingest_dashboard(api):


def main(args):
global client
client = InfluxDBClient(args.host, args.port, args.user, args.password, args.project)

osc.conf.get_config(override_apiurl=args.apiurl)
apiurl = osc.conf.config['apiurl']
osc.conf.config['debug'] = args.debug

# Ensure database exists.
client.create_database(client._database)

metrics_release.ingest(client)
if args.release_only:
return

# Use separate cache since it is persistent.
_, package = project_pseudometa_package(apiurl, args.project)
if args.wipe_cache:
Cache.delete_all()
if args.heavy_cache:
Cache.PATTERNS[r'/search/request'] = sys.maxsize
Cache.PATTERNS[r'/source/[^/]+/{}/_history'.format(package)] = sys.maxsize
Cache.PATTERNS[r'/source/[^/]+/{}/[^/]+\?rev=.*'.format(package)] = sys.maxsize
Cache.init('metrics')

Config(apiurl, args.project)
api = StagingAPI(apiurl, args.project)

print(f'dashboard: wrote {ingest_dashboard(api):,} points')

global who_workaround_swap, who_workaround_miss
who_workaround_swap = who_workaround_miss = 0

points_requests = ingest_requests(api, args.project)
points_schedule = ingest_release_schedule(args.project)
with InfluxDBClient(url=f"http://{args.host}:{args.port}",
username=args.user,
password=args.password,
org="openSUSE-release-tools-metrics") as client:

osc.conf.get_config(override_apiurl=args.apiurl)
apiurl = osc.conf.config['apiurl']
osc.conf.config['debug'] = args.debug

# Ensure database exists.
buckets_api = client.buckets_api()
buckets_api.create_bucket(args.project)

metrics_release.ingest(client, bucketname=args.project)
if args.release_only:
return

# Use separate cache since it is persistent.
_, package = project_pseudometa_package(apiurl, args.project)
if args.wipe_cache:
Cache.delete_all()
if args.heavy_cache:
Cache.PATTERNS[r'/search/request'] = sys.maxsize
Cache.PATTERNS[r'/source/[^/]+/{}/_history'.format(package)] = sys.maxsize
Cache.PATTERNS[r'/source/[^/]+/{}/[^/]+\?rev=.*'.format(package)] = sys.maxsize
Cache.init('metrics')

Config(apiurl, args.project)
api = StagingAPI(apiurl, args.project)

print(f'dashboard: wrote {ingest_dashboard(client, args.project, api):,} points')

global who_workaround_swap, who_workaround_miss
who_workaround_swap = who_workaround_miss = 0

points_requests = ingest_requests(client, api, args.project)
points_schedule = ingest_release_schedule(client, args.project)

print('who_workaround_swap', who_workaround_swap)
print('who_workaround_miss', who_workaround_miss)
Expand Down
34 changes: 23 additions & 11 deletions metrics_release.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from dateutil.parser import parse as date_parse
from metrics import timestamp
import requests
from urllib.parse import urljoin
from datetime import datetime

import requests
import yaml
from dateutil.parser import parse as date_parse
from influxdb_client.client.write_api import SYNCHRONOUS

from metrics import timestamp

BASEURL = 'http://review.tumbleweed.boombatower.com/data/'

Expand All @@ -13,12 +16,21 @@ def data_load(name):
return yaml.safe_load(response.text)


def data_write(client, measurement, points):
client.drop_measurement(measurement)
client.write_points(points, 's')
def data_drop(client, bucketname, measurement):
start = "1970-01-01T00:00:00Z"
stop = datetime.utcnow().isoformat() + "Z"
delete_api = client.delete_api()
delete_api.delete(start=start, stop=stop, bucket=bucketname,
predicate=f'_measurement="{measurement}"')


def data_write(client, bucketname, measurement, points):
data_drop(client, bucketname, measurement)
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket=bucketname, record=points, write_precision='s')


def ingest_data(client, name):
def ingest_data(client, bucketname, name):
data = data_load(name)

measurement = f'release_{name}'
Expand All @@ -31,7 +43,7 @@ def ingest_data(client, name):
'time': timestamp(date_parse(release)),
})

data_write(client, measurement, points)
data_write(client, bucketname, measurement, points)
print(f'wrote {len(points)} for {name}')


Expand Down Expand Up @@ -59,10 +71,10 @@ def map_snapshot(details):
}


def ingest(client):
if client._database != 'openSUSE:Factory':
def ingest(client, bucketname):
if bucketname != 'openSUSE:Factory':
print('skipping release ingest for unsupported project')
return

for name in ['bug', 'mail', 'score', 'snapshot']:
ingest_data(client, name)
ingest_data(client, bucketname, name)
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ python-dateutil
pyxdg
cmdln
git+https://github.com/openSUSE/osc
influxdb
influxdb-client

# Dependencies for testing
httpretty