Skip to content

Commit

Permalink
Merge pull request #183 from mwestphall/merge-kapel
Browse files Browse the repository at this point in the history
Add script to convert kapel output records to Gratia records
  • Loading branch information
mwestphall authored Aug 14, 2024
2 parents 459959f + 55c32c2 commit cbb05d8
Show file tree
Hide file tree
Showing 8 changed files with 396 additions and 0 deletions.
70 changes: 70 additions & 0 deletions .github/workflows/build-k8s-container.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
---
name: Build and Push Kubernetes Probe Docker image

on:
push:
branches: [ 2.x ]
paths:
- '.github/workflows/build-k8s-container.yml'
- 'gratia-output-kapel/**'
- 'common/**'
repository_dispatch:
types:
- dispatch-build
workflow_dispatch:

jobs:
build:
runs-on: ubuntu-latest
if: startsWith(github.repository, 'opensciencegrid/')
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0

- name: make date tag
id: mkdatetag
run: echo "dtag=$(date +%Y%m%d-%H%M)" >> $GITHUB_OUTPUT

- name: make tags
id: mktags
env:
DTAG=${{ steps.mkdatetag.outputs.dtag }}
run: |
reponame="osg-htc/gratia-probe-k8s"
OSGVER=23
tags=()
for registry in docker.io hub.opensciencegrid.org; do
tags+=("$registry/$reponame:$OSGVER-release")
tags+=("$registry/$reponame:$OSGVER-release-$DTAG)
done
IFS=,
echo "image_tags=${tags[*]}" >> $GITHUB_OUTPUT
echo "ts_image=hub.opensciencegrid.org/$reponame:$OSGVER-release-$DTAG"
- name: Set up Docker Buildx
uses: docker/[email protected]

- name: Log in to Docker Hub
uses: docker/[email protected]
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}

- name: Log in to OSG Harbor
uses: docker/[email protected]
with:
registry: hub.opensciencegrid.org
username: ${{ secrets.OSG_HARBOR_ROBOT_USER }}
password: ${{ secrets.OSG_HARBOR_ROBOT_PASSWORD }}

- name: Build and push Docker images
uses: docker/build-push-action@v4
with:
context: .
file: osg-pilot-container/Dockerfile
push: true
build-args: |
TIMESTAMP_IMAGE=${{ steps.mktags.outputs.ts_image }}
tags: "${{ steps.mktags.outputs.image_tags }}"
22 changes: 22 additions & 0 deletions kubernetes/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM almalinux:9

ARG TIMESTAMP_IMAGE=gratia-probe-k8s:release-$(date +%Y%m%d-%H%M)
ENV GRATIA_PROBE_VERSION=$TIMESTAMP_IMAGE
ARG UID=10000
ARG GID=10000

# Install EPEL, the OSG software base repo, and gratia-probe-common
RUN yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm && \
yum install -y https://repo.opensciencegrid.org/osg/23-main/osg-23-main-el9-release-latest.rpm && \
yum install -y gratia-probe-common python3-pip

# Make probe runnable as non-root
RUN chown -R $UID:$GID /var/lock/gratia /var/lib/gratia/data /var/lib/gratia/tmp /var/log/gratia
WORKDIR /gratia
COPY requirements.txt /gratia
RUN pip install -r requirements.txt
COPY ProbeConfig /etc/gratia/kubernetes/
COPY *.py /gratia/

USER $UID:$GID
CMD python3 kubernetes_meter.py
75 changes: 75 additions & 0 deletions kubernetes/ProbeConfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<ProbeConfiguration

Title1="Collector Information"

CollectorHost="gratia-osg-prod.opensciencegrid.org:80"
SSLHost="gratia-osg-prod.opensciencegrid.org:443"
SSLRegistrationHost="gratia-osg-prod.opensciencegrid.org:80"

CollectorService="/gratia-servlets/rmi"
SSLCollectorService="/gratia-servlets/rmi"
RegistrationService="/gratia-registration/register"

Title2="Probe information and functional configuration"

ProbeName="k8s:localhost"
SiteName="Kubernetes Probe"
Grid="OSG"
SuppressUnknownVORecords="0"
SuppressNoDNRecords="0"
SuppressGridLocalRecords="0"
QuarantineUnknownVORecords="1"
EnableProbe="1"

Title3="Tuning parameter"

BundleSize="100"
Comments28="Number of records to be sent per envelope. Consider setting higher (up to 200)."
MaxPendingFiles="100000"
MaxStagedArchives="400"
Comments30="The maximum of backload files is MaxPendingFiles * MaxStagedArchives per probe"
UseSyslog="0"
ConnectionTimeout="900"
Comments31="Number of second GratiaCore will wait before timing out an attempt to connect or post to the Collector"

LogLevel="2"
Comments32="Controls debug messages printed to log file."
DebugLevel="0"
Comments33="Controls debug messages printed to screen."
LogRotate="31"
LogFileName=""
Comments34="If LogFileName is set, then there is no log file rotation. Otherwise Log file rotation will create one new file each day and delete the older ones"
DataLengthMax="0"
Comments35="The maximum number of records (or hours, depending on the probe) processed in a single invocation, 0 = all records, no max"
DataFileExpiration="31"
Comments36="The number of days quarantined and unusable data files are kept"
QuarantineSize="200"
Comments37="The maximum size in Mb allowed to be kept in each quarantined directory"
GratiaExtension="gratia.xml"

Title4="Authentication Configuration"

UseSSL="0"
CertificateFile="/etc/grid-security/hostcert.pem"
KeyFile="/etc/grid-security/hostkey.pem"
UseGratiaCertificates="0"
Comments40="If no directory is specified the gratia certificate file will be created in 'WorkingFolder'/certs."
GratiaCertificateFile="/var/lib/gratia/data/certs/gratia.probecert.pem"
GratiaKeyFile="/var/lib/gratia/data/certs/gratia.probekey.pem"

Title5="File and directory location"

UserVOMapFile="/var/lib/osg/user-vo-map"
Comments51="Location and wildcard pattern of log files that contains certificate information about the jobs in the format followed by the 'blah demons'."
CertInfoLogPattern="/var/log/accounting/blahp.log-*"
CondorCEHistoryFolder="/var/lib/gratia/condorce_data"

DataFolder="/var/lib/gratia/data/"
WorkingFolder="/var/lib/gratia/tmp"
LogFolder="/var/log/gratia"



/>

<!-- This probe has not yet been configured -->
10 changes: 10 additions & 0 deletions kubernetes/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
services:
gratia-output:
image: hub.opensciencegrid.org/osgpreview/kapel-gratia-output:latest
build:
network: host
context: .
dockerfile: Dockerfile
volumes:
- /tmp/dirq/:/srv/kapel
- ./sample.env:/gratia/.env
29 changes: 29 additions & 0 deletions kubernetes/gratia_k8s_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Configuration module for KAPEL

from environs import Env

# Read config settings from environment variables (and a named env file in CWD if specified),
# do input validation, and return a config object. Note, if a '.env' file exists in CWD it will be used by default.
class GratiaK8sConfig:
def __init__(self, envFile=None):
env = Env()
# Read a .env file if one is specified, otherwise only environment variables will be used.
env.read_env(envFile, recurse=False, verbose=True)

# Where to write the APEL message output.
self.output_path = env.path("OUTPUT_PATH", "/srv/kapel")

# infrastructure info
self.infrastructure_type = env.str("INFRASTRUCTURE_TYPE", "grid")
self.infrastructure_description = env.str("INFRASTRUCTURE_DESCRIPTION", "GRATIA-KUBERNETES")

# optionally define number of nodes and processors. Should not be necessary to
# set a default of 0 here but see https://github.com/apel/apel/issues/241
self.nodecount = env.int("NODECOUNT", 0)
self.processors = env.int("PROCESSORS", 0)


# Gratia config
self.gratia_config_path = env.str("GRATIA_CONFIG_PATH", "/etc/gratia/kubernetes/")

self.gratia_probe_version = env.str("GRATIA_PROBE_VERSION", None)
179 changes: 179 additions & 0 deletions kubernetes/kubernetes_meter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import argparse
import datetime

from gratia_k8s_config import GratiaK8sConfig
from dirq.QueueSimple import QueueSimple
import os
from datetime import datetime, timezone
import re

from gratia.common.Gratia import DebugPrint
from gratia.common.debug import DebugPrintTraceback
import gratia.common.GratiaCore as GratiaCore
import gratia.common.GratiaWrapper as GratiaWrapper
import gratia.common.Gratia as Gratia
import gratia.common.config as config

probe_version = "%%%RPMVERSION%%%"

probe_name = os.path.basename(os.path.dirname(os.path.abspath(__file__)))

# log levels
CRITICAL = 0
ERROR = 1
WARNING = 2
INFO = 3
DEBUG = 4

# Gratia record constants
SECONDS = "Was entered in seconds"
LOCAL_USER = "osgvo-container-pilot"
USER = "user"
RESOURCE_TYPE = "Batch"

BATCH_SIZE = 100

class ApelRecordConverter():
apel_dict: dict

def __init__(self, apel_record: bytes):
self._parse_apel_str(apel_record.decode())

def _parse_apel_str(self, apel_record: str):
self.apel_dict = {}
lines = apel_record.split('\n')
for line in lines:
kv_pair = [v.strip() for v in line.split(':')]
if len(kv_pair) == 2:
self.apel_dict[kv_pair[0]] = kv_pair[1]

def getint(self, key):
return int(float(self.apel_dict.get(key, 0)))

def get(self, key):
return self.apel_dict.get(key)


def site_probe(self):
site_dns = re.sub(r'[^a-zA-Z0-9-]', '-', self.get('Site')).strip('-') # sanitize site
return "kubernetes:%s.gratia.osg-htc.org" % site_dns

def to_gratia_record(self):
# TODO the following fields are not currently tracked:
# memory, machine name, grid
r = Gratia.UsageRecord(RESOURCE_TYPE)
r.StartTime( self.getint('StartTime'), SECONDS)
r.MachineName (self.get('MachineName'))
r.LocalJobId( self.get('LocalJobId'))
r.EndTime( self.getint('EndTime'), SECONDS)
r.WallDuration(self.getint('WallDuration'), SECONDS)
r.CpuDuration( self.getint('CpuDuration'), USER, SECONDS)
r.Memory( self.getint('MemoryVirtual'), 'KB', description='RSS')
r.Processors( self.getint('Processors'), metric="max")
r.SiteName( self.get('Site'))
r.ProbeName( self.site_probe())
r.Grid( self.get('InfrastructureType')) # Best guess
r.LocalUserId( LOCAL_USER)
r.VOName( self.get('VO'))
r.ReportableVOName(self.get('VO'))
return r

@classmethod
def is_individual_record(cls, data:bytes):
return data.decode().startswith('APEL-individual-job-message:')


def send_gratia_records(records: list[ApelRecordConverter]):
# TODO the assumption of uniform site/probe might not be true
site = records[0].get('Site')
probe = records[0].site_probe()

# GratiaCore.Initialize(gratia_config)

config.Config.setSiteName(site)
config.Config.setMeterName(probe)

GratiaCore.Handshake()

try:
GratiaCore.SearchOutstandingRecord()
except Exception as e:
print(f"Failed to search outstanding records: {e}")
raise

GratiaCore.Reprocess()

for record in records:
resp = GratiaCore.Send(record.to_gratia_record())
print(resp)

GratiaCore.ProcessCurrentBundle()




def setup_gratia(config: GratiaK8sConfig):

print(config.gratia_config_path)
if not config.gratia_config_path or not os.path.exists(config.gratia_config_path):
raise Exception("No valid gratia config path given")
GratiaCore.Config = GratiaCore.ProbeConfiguration(config.gratia_config_path)

GratiaWrapper.CheckPreconditions()
GratiaWrapper.ExclusiveLock()

# Register gratia
GratiaCore.RegisterReporter("kubernetes_meter")
GratiaCore.RegisterService("Kubernetes", config.gratia_probe_version)
GratiaCore.setProbeBatchManager("kubernetes")

GratiaCore.Initialize(config.gratia_config_path)

def batch_dirq(queue, batch_size):
""" batch the records in a dirq into groups of a fixed size """
# TODO this is not the most elegant approach
records = []
for name in queue:
records.append(name)
if len(records) >= batch_size:
yield records
records = []
# Yield the last entries as well
yield records

def main(envFile: str):
print(f'Starting Gratia post-processor: {__file__} with envFile {envFile} at {datetime.now(tz=timezone.utc).isoformat()}')
with open(envFile) as envf:
print(f'===== envFile contents: =====\n{envf.read()}')

cfg = GratiaK8sConfig(envFile)

setup_gratia(cfg)

dirq = QueueSimple(str(cfg.output_path))
for batch in batch_dirq(dirq, BATCH_SIZE):
apel_records = []
for name in batch:
if not dirq.lock(name):
continue
data = dirq.get(name)
if ApelRecordConverter.is_individual_record(data):
apel_records.append(ApelRecordConverter(data))
dirq.unlock(name)
if apel_records:
send_gratia_records(apel_records)
# Clear out the chunk of the queue if all the sends succeed
for name in batch:
if not dirq.lock(name):
continue
dirq.remove(name)
print("done")



if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Extract Kubernetes job accounting data from Prometheus and prepare it for APEL publishing.")
# This should be the only CLI argument, since all other config should be specified via env.
parser.add_argument("-e", "--env-file", default=None, help="name of file containing environment variables for configuration")
args = parser.parse_args()
main(args.env_file)
4 changes: 4 additions & 0 deletions kubernetes/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Useful for handling configuration
environs==11.0.0
# Useful for adding messages to outgoing queue
dirq==1.8
Loading

0 comments on commit cbb05d8

Please sign in to comment.