Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't create scancel commands for users without jobs #137

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions bin/sync_slurm_acct.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
SYNC_TIMESTAMP_FILENAME = "/var/cache/%s.timestamp" % (NAGIOS_HEADER)
SYNC_SLURM_ACCT_LOGFILE = "/var/log/%s.log" % (NAGIOS_HEADER)

MAX_USERS_JOB_CANCEL = 10

class SyncSanityError(Exception):
pass

Expand Down Expand Up @@ -75,8 +73,16 @@ def main():
'store',
[PRODUCTION, PILOT]
),
'force': (
'Force the sync instead of bailing if too many scancel commands would be issues',
'max_scancel': ('Limit of scancel commands allowed', int, 'store', 20),
'max_user_del': ('Limit of user delete commands allowed', int, 'store', 10),
'force_scancel': (
'Force the sync instead of bailing if too many scancel commands would be issued',
None,
'store_true',
False
),
'force_user_del': (
'Force the sync instead of bailing if too many user delete commands would be issued',
None,
'store_true',
False
Expand Down Expand Up @@ -154,15 +160,25 @@ def main():
sacctmgr_commands = []

# safety to avoid emptying the cluster due to some error upstream
if not opts.options.force and len(job_cancel_commands) > MAX_USERS_JOB_CANCEL:
logging.warning("Would add commands to cancel jobs for %d users", len(job_cancel_commands))
if not opts.options.force_scancel and len(job_cancel_commands) > (opts.options.max_scancel / len(clusters)):
logging.warning("Would add %s scancel commands per cluster", len(job_cancel_commands) / len(clusters))
logging.debug("Would execute the following cancel commands:")
for jc in job_cancel_commands.values():
logging.debug("%s", jc)
raise SyncSanityError("Would cancel jobs for %d users" % len(job_cancel_commands))
raise SyncSanityError("Would run %d scancel jobs per cluster" % len(job_cancel_commands) / len(clusters))

sacctmgr_commands += [c for cl in job_cancel_commands.values() for c in cl]

# safety to avoid removing too many users due to some error upstream
max_user_del = opts.options.max_user_del / len(clusters)
if not opts.options.force_user_del and len(association_remove_commands) > max_user_del:
logging.warning("Would add commands to remove %d users", len(association_remove_commands) / len(clusters))
logging.debug("Would execute the following remove commands:")
for jc in association_remove_commands:
logging.debug("%s", jc)
raise SyncSanityError("Would run %d user delete commands per cluster" %
(len(association_remove_commands) / len(clusters)))

# removing users may fail, so should be done last
sacctmgr_commands += association_remove_commands

Expand Down
110 changes: 110 additions & 0 deletions lib/vsc/administration/slurm/sacct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#
# Copyright 2013-2022 Ghent University
#
# This file is part of vsc-administration,
# originally created by the HPC team of Ghent University (http://ugent.be/hpc/en),
# with support of Ghent University (http://ugent.be/hpc),
# the Flemish Supercomputer Centre (VSC) (https://www.vscentrum.be),
# the Flemish Research Foundation (FWO) (http://www.fwo.be/en)
# and the Department of Economy, Science and Innovation (EWI) (http://www.ewi-vlaanderen.be/en).
#
# https://github.com/hpcugent/vsc-administration
#
# All rights reserved.
#
"""
sacct commands
"""
import logging
import re
from enum import Enum

from vsc.accountpage.wrappers import mkNamedTupleInstance
from vsc.config.base import ANTWERPEN, BRUSSEL, GENT, LEUVEN
from vsc.utils.missing import namedtuple_with_defaults
from vsc.utils.run import asyncloop

SLURM_SACCT = "/usr/bin/sacct"

SLURM_ORGANISATIONS = {
ANTWERPEN: 'uantwerpen',
BRUSSEL: 'vub',
GENT: 'ugent',
LEUVEN: 'kuleuven',
}


class SacctParseException(Exception):
pass

class SacctException(Exception):
pass


class SacctTypes(Enum):
jobs = "jobs"


# Fields for Slurm 20.11.
# FIXME: at some point this should be versioned

SacctJobsFields = [
"JobID", "JobName", "Partition", "Account", "AllocCPUS", "State", "ExitCode",
]

SlurmJobs = namedtuple_with_defaults('SlurmJobs', SacctJobsFields)

def mkSlurmJobs(fields):
"""Make a named tuple from the given fields"""
activejobs = mkNamedTupleInstance(fields, SlurmJobs)
return activejobs

def parse_slurm_sacct_line(header, line, info_type):
"""Parse the line into the correct data type."""
fields = line.split("|")

if info_type == SacctTypes.jobs:
creator = mkSlurmJobs
else:
raise SacctParseException("info_type %s does not exist.", info_type)

return creator(dict(zip(header, fields)))


def parse_slurm_sacct_dump(lines, info_type):
"""Parse the sacctmgr dump from the listing."""
acct_info = set()

header = [w.replace(' ', '_').replace('%', 'PCT_') for w in lines[0].rstrip().split("|")]

for line in lines[1:]:
logging.debug("line %s", line)
line = line.rstrip()
try:
info = parse_slurm_sacct_line(header, line, info_type)
except Exception as err:
logging.exception("Slurm sacct parse dump: could not process line %s [%s]", line, err)
raise
# This fails when we get e.g., the users and look at the account lines.
# We should them just skip that line instead of raising an exception
if info:
acct_info.add(info)

return acct_info


def get_slurm_sacct_active_jobs_for_user(user):
"""
Get running and queued jobs for user.
"""
(exitcode, contents) = asyncloop([
SLURM_SACCT, "--allclusters", "--parsable2", "--state", "RUNNING,PENDING", "--user", user])
if exitcode != 0:
if re.search("sacct: error: Invalid user id: %s" % user, contents):
logging.warning("User %s does not exist, assuming no active jobs.", user)
return None
else:
raise SacctException("Cannot run sacct")

info = parse_slurm_sacct_dump(contents.splitlines(), SacctJobsFields)
return info
7 changes: 5 additions & 2 deletions lib/vsc/administration/slurm/sacctmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from vsc.config.base import ANTWERPEN, BRUSSEL, GENT, LEUVEN
from vsc.utils.missing import namedtuple_with_defaults
from vsc.utils.run import asyncloop

from vsc.administration.slurm.scancel import create_remove_user_jobs_command

SLURM_SACCT_MGR = "/usr/bin/sacctmgr"
Expand All @@ -39,6 +38,10 @@ class SacctMgrException(Exception):
pass


class SacctMgrParseException(Exception):
pass


class SacctMgrTypes(Enum):
accounts = "accounts"
users = "users"
Expand Down Expand Up @@ -141,7 +144,7 @@ def parse_slurm_sacct_line(header, line, info_type, user_field_number, account_f
elif info_type == SacctMgrTypes.resource:
creator = mkSlurmResource
else:
return None
raise SacctMgrParseException("info_type %s does not exist.", info_type)

return creator(dict(zip(header, fields)))

Expand Down
8 changes: 5 additions & 3 deletions lib/vsc/administration/slurm/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
create_add_account_command, create_remove_account_command,
create_change_account_fairshare_command,
create_add_user_command, create_change_user_command, create_remove_user_command, create_remove_user_account_command,
create_add_qos_command, create_remove_qos_command, create_modify_qos_command
create_add_qos_command, create_remove_qos_command, create_modify_qos_command,
)
from vsc.administration.slurm.scancel import (
create_remove_user_jobs_command, create_remove_jobs_for_account_command,
)

from vsc.administration.slurm.sacct import get_slurm_sacct_active_jobs_for_user

class SlurmSyncException(Exception):
pass
Expand Down Expand Up @@ -373,7 +373,9 @@ def slurm_user_accounts(vo_members, active_accounts, slurm_user_info, clusters,
])

for user in remove_users:
job_cancel_commands[user].append(create_remove_user_jobs_command(user=user, cluster=cluster))
active_jobs = get_slurm_sacct_active_jobs_for_user(user)
if active_jobs:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot of boilerplate, but this is the main change.

job_cancel_commands[user].append(create_remove_user_jobs_command(user=user, cluster=cluster))
wdpypere marked this conversation as resolved.
Show resolved Hide resolved

# Remove users from the clusters (in all accounts)
association_remove_commands.extend([
Expand Down
45 changes: 45 additions & 0 deletions test/slurm_sacct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#
# Copyright 2015-2022 Ghent University
#
# This file is part of vsc-administration,
# originally created by the HPC team of Ghent University (http://ugent.be/hpc/en),
# with support of Ghent University (http://ugent.be/hpc),
# the Flemish Supercomputer Centre (VSC) (https://www.vscentrum.be),
# the Flemish Research Foundation (FWO) (http://www.fwo.be/en)
# and the Department of Economy, Science and Innovation (EWI) (http://www.ewi-vlaanderen.be/en).
#
# https://github.com/hpcugent/vsc-administration
#
# All rights reserved.
#
"""
Tests for vsc.administration.slurm.*

@author: Andy Georges (Ghent University)
"""

from vsc.install.testing import TestCase

from vsc.administration.slurm.sacct import parse_slurm_sacct_dump, SlurmJobs, SacctTypes, SacctParseException


class SlurmSacctmgrTest(TestCase):
def test_parse_slurmm_sacct_dump(self):
"""Test that the sacct output is correctly processed."""

sacct_active_jobs_output = [
"JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode",
"14367800|normal|part1|acc1|1|RUNNING|0:0",
"14367800.batch|batch||acc1|1|RUNNING|0:0",
"14367800.extern|extern||acc1|1|RUNNING|0:0",
]
info = parse_slurm_sacct_dump(sacct_active_jobs_output, SacctTypes.jobs)
self.assertEqual(set(info), set([
SlurmJobs(JobID='14367800', JobName='normal', Partition='part1', Account='acc1', AllocCPUS='1', State='RUNNING', ExitCode='0:0'),
SlurmJobs(JobID='14367800.batch', JobName='batch', Partition='', Account='acc1', AllocCPUS='1', State='RUNNING', ExitCode='0:0'),
SlurmJobs(JobID='14367800.extern', JobName='extern', Partition='', Account='acc1', AllocCPUS='1', State='RUNNING', ExitCode='0:0')
]))


with self.assertRaises(SacctParseException):
parse_slurm_sacct_dump("sacct_active_jobs_output", "doesnotexist")
4 changes: 4 additions & 0 deletions test/slurm_sacctmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from vsc.administration.slurm.sacctmgr import (
parse_slurm_sacct_dump,
SacctMgrTypes, SlurmAccount, SlurmUser,
SacctMgrParseException
)


Expand Down Expand Up @@ -69,3 +70,6 @@ def test_parse_slurmm_sacct_dump(self):
]))



with self.assertRaises(SacctMgrParseException):
parse_slurm_sacct_dump("sacctmgr_active_jobs_output", "doesnotexist")
4 changes: 3 additions & 1 deletion test/slurm_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
slurm_project_qos,
)

from mock import patch

VO = namedtuple("VO", ["vsc_id", "institute", "fairshare", "qos"])
VO.__new__.__defaults__ = (None,) * len(VO._fields)
Expand Down Expand Up @@ -207,7 +208,8 @@ def test_slurm_user_accounts(self):
SlurmUser(User='user5', Def_Acct='vo2', Admin='None', Cluster='banette', Account='vo2', Partition='', Share='1', MaxJobs='', MaxNodes='', MaxCPUs='', MaxSubmit='', MaxWall='', MaxCPUMins='', QOS='normal', Def_QOS=''),
]

(job_cancel_commands, commands, remove_user_commands) = slurm_user_accounts(vo_members, active_accounts, slurm_user_info, ["banette"])
with patch("vsc.administration.slurm.sync.get_slurm_sacct_active_jobs_for_user", return_value="testing"):
(job_cancel_commands, commands, remove_user_commands) = slurm_user_accounts(vo_members, active_accounts, slurm_user_info, ["banette"])

self.assertEqual(set([tuple(x) for x in commands]), set([tuple(x) for x in [
shlex.split("/usr/bin/sacctmgr -i add user user6 Account=vo2 Cluster=banette DefaultAccount=vo2"),
Expand Down