Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't create scancel commands for users without jobs #137

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions bin/sync_slurm_acct.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
SYNC_TIMESTAMP_FILENAME = "/var/cache/%s.timestamp" % (NAGIOS_HEADER)
SYNC_SLURM_ACCT_LOGFILE = "/var/log/%s.log" % (NAGIOS_HEADER)

MAX_USERS_JOB_CANCEL = 10

class SyncSanityError(Exception):
pass

Expand Down Expand Up @@ -75,8 +73,16 @@ def main():
'store',
[PRODUCTION, PILOT]
),
'force': (
'Force the sync instead of bailing if too many scancel commands would be issues',
'max_scancel': ('Limit of scancel commands allowed', int, 'store', 20),
'max_user_del': ('Limit of user delete commands allowed', int, 'store', 10),
'force_scancel': (
'Force the sync instead of bailing if too many scancel commands would be issued',
None,
'store_true',
False
),
'force_user_del': (
'Force the sync instead of bailing if too many user delete commands would be issued',
None,
'store_true',
False
Expand Down Expand Up @@ -154,15 +160,25 @@ def main():
sacctmgr_commands = []

# safety to avoid emptying the cluster due to some error upstream
if not opts.options.force and len(job_cancel_commands) > MAX_USERS_JOB_CANCEL:
logging.warning("Would add commands to cancel jobs for %d users", len(job_cancel_commands))
if not opts.options.force_scancel and len(job_cancel_commands) > (opts.options.max_scancel / len(clusters)):
logging.warning("Would add %s scancel commands per cluster", len(job_cancel_commands) / len(clusters))
logging.debug("Would execute the following cancel commands:")
for jc in job_cancel_commands.values():
logging.debug("%s", jc)
raise SyncSanityError("Would cancel jobs for %d users" % len(job_cancel_commands))
raise SyncSanityError("Would run %d scancel jobs per cluster" % len(job_cancel_commands) / len(clusters))

sacctmgr_commands += [c for cl in job_cancel_commands.values() for c in cl]

# safety to avoid removing too many users due to some error upstream
max_user_del = opts.options.max_user_del / len(clusters)
if not opts.options.force_user_del and len(association_remove_commands) > max_user_del:
logging.warning("Would add commands to remove %d users", len(association_remove_commands) / len(clusters))
logging.debug("Would execute the following remove commands:")
for jc in association_remove_commands:
logging.debug("%s", jc)
raise SyncSanityError("Would run %d user delete commands per cluster" %
(len(association_remove_commands) / len(clusters)))

# removing users may fail, so should be done last
sacctmgr_commands += association_remove_commands

Expand Down
39 changes: 38 additions & 1 deletion lib/vsc/administration/slurm/sacctmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
sacctmgr commands
"""
import logging
import re
from enum import Enum

from vsc.accountpage.wrappers import mkNamedTupleInstance
Expand All @@ -26,6 +27,7 @@
from vsc.administration.slurm.scancel import create_remove_user_jobs_command

SLURM_SACCT_MGR = "/usr/bin/sacctmgr"
SLURM_SACCT = "/usr/bin/sacct"
wdpypere marked this conversation as resolved.
Show resolved Hide resolved

SLURM_ORGANISATIONS = {
ANTWERPEN: 'uantwerpen',
Expand All @@ -39,11 +41,16 @@ class SacctMgrException(Exception):
pass


class SacctParseException(Exception):
pass


class SacctMgrTypes(Enum):
accounts = "accounts"
users = "users"
qos = "qos"
resource = "resource"
activejobs = "activejobs"
wdpypere marked this conversation as resolved.
Show resolved Hide resolved


# Fields for Slurm 20.11.
Expand Down Expand Up @@ -74,6 +81,10 @@ class SacctMgrTypes(Enum):
"Name", "Server", "Type", "Count", "PCT__Allocated", "ServerType",
]

SacctActivejobsFields = [
"JobID", "JobName", "Partition", "Account", "AllocCPUS", "State", "ExitCode",
]

IGNORE_USERS = ["root"]
IGNORE_ACCOUNTS = ["root"]
IGNORE_QOS = ["normal"]
Expand All @@ -82,6 +93,7 @@ class SacctMgrTypes(Enum):
SlurmUser = namedtuple_with_defaults('SlurmUser', SacctUserFields)
SlurmQos = namedtuple_with_defaults('SlurmQos', SacctQosFields)
SlurmResource = namedtuple_with_defaults('SlurmResource', SacctResourceFields)
SlurmActivejobs = namedtuple_with_defaults('SlurmActivejobs', SacctActivejobsFields)


def mkSlurmAccount(fields):
Expand All @@ -106,6 +118,12 @@ def mkSlurmQos(fields):
return qos


def mkSlurmActivejobs(fields):
"""Make a named tuple from the given fields"""
activejobs = mkNamedTupleInstance(fields, SlurmActivejobs)
return activejobs


def mkSlurmResource(fields):
"""Make a named tuple from the given fields"""
fields['Count'] = int(fields['Count'])
Expand Down Expand Up @@ -140,8 +158,10 @@ def parse_slurm_sacct_line(header, line, info_type, user_field_number, account_f
creator = mkSlurmQos
elif info_type == SacctMgrTypes.resource:
creator = mkSlurmResource
elif info_type == SacctMgrTypes.activejobs:
creator = mkSlurmActivejobs
else:
return None
raise SacctParseException("info_type %s does not exist.", info_type)

return creator(dict(zip(header, fields)))

Expand Down Expand Up @@ -514,3 +534,20 @@ def create_modify_resource_license_command(name, server, stype, clusters, count)
]

return command


def get_slurm_sacct_active_jobs_for_user(user):
"""
Get running and queued jobs for user.
"""
(exitcode, contents) = asyncloop([SLURM_SACCT, "-L", "-P", "-s", "r", "-u", user])
wdpypere marked this conversation as resolved.
Show resolved Hide resolved
if exitcode != 0:
if re.search("sacct: error: Invalid user id: %s" % user, contents):
logging.warning("User %s does not exist, assuming no active jobs.", user)
return None
else:
raise SacctMgrException("Cannot run sacct")

info = parse_slurm_sacct_dump(contents.splitlines(), SacctActivejobsFields)
return info

6 changes: 4 additions & 2 deletions lib/vsc/administration/slurm/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
create_add_account_command, create_remove_account_command,
create_change_account_fairshare_command,
create_add_user_command, create_change_user_command, create_remove_user_command, create_remove_user_account_command,
create_add_qos_command, create_remove_qos_command, create_modify_qos_command
create_add_qos_command, create_remove_qos_command, create_modify_qos_command, get_slurm_sacct_active_jobs_for_user,
)
from vsc.administration.slurm.scancel import (
create_remove_user_jobs_command, create_remove_jobs_for_account_command,
Expand Down Expand Up @@ -373,7 +373,9 @@ def slurm_user_accounts(vo_members, active_accounts, slurm_user_info, clusters,
])

for user in remove_users:
job_cancel_commands[user].append(create_remove_user_jobs_command(user=user, cluster=cluster))
active_jobs = get_slurm_sacct_active_jobs_for_user(user)
if active_jobs:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot of boilerplate, but this is the main change.

job_cancel_commands[user].append(create_remove_user_jobs_command(user=user, cluster=cluster))
wdpypere marked this conversation as resolved.
Show resolved Hide resolved

# Remove users from the clusters (in all accounts)
association_remove_commands.extend([
Expand Down
20 changes: 19 additions & 1 deletion test/slurm_sacctmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
from vsc.install.testing import TestCase

from vsc.administration.slurm.sacctmgr import (
parse_slurm_sacct_dump,
parse_slurm_sacct_dump, SlurmActivejobs,
SacctMgrTypes, SlurmAccount, SlurmUser,
SacctParseException
)


Expand Down Expand Up @@ -69,3 +70,20 @@ def test_parse_slurmm_sacct_dump(self):
]))



sacctmgr_active_jobs_output = [
"JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode",
"14367800|normal|part1|acc1|1|RUNNING|0:0",
"14367800.batch|batch||acc1|1|RUNNING|0:0",
"14367800.extern|extern||acc1|1|RUNNING|0:0",
]
info = parse_slurm_sacct_dump(sacctmgr_active_jobs_output, SacctMgrTypes.activejobs)
self.assertEqual(set(info), set([
SlurmActivejobs(JobID='14367800', JobName='normal', Partition='part1', Account='acc1', AllocCPUS='1', State='RUNNING', ExitCode='0:0'),
SlurmActivejobs(JobID='14367800.batch', JobName='batch', Partition='', Account='acc1', AllocCPUS='1', State='RUNNING', ExitCode='0:0'),
SlurmActivejobs(JobID='14367800.extern', JobName='extern', Partition='', Account='acc1', AllocCPUS='1', State='RUNNING', ExitCode='0:0')
]))


with self.assertRaises(SacctParseException):
parse_slurm_sacct_dump("sacctmgr_active_jobs_output", "doesnotexist")
4 changes: 3 additions & 1 deletion test/slurm_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
slurm_project_qos,
)

from mock import patch

VO = namedtuple("VO", ["vsc_id", "institute", "fairshare", "qos"])
VO.__new__.__defaults__ = (None,) * len(VO._fields)
Expand Down Expand Up @@ -207,7 +208,8 @@ def test_slurm_user_accounts(self):
SlurmUser(User='user5', Def_Acct='vo2', Admin='None', Cluster='banette', Account='vo2', Partition='', Share='1', MaxJobs='', MaxNodes='', MaxCPUs='', MaxSubmit='', MaxWall='', MaxCPUMins='', QOS='normal', Def_QOS=''),
]

(job_cancel_commands, commands, remove_user_commands) = slurm_user_accounts(vo_members, active_accounts, slurm_user_info, ["banette"])
with patch("vsc.administration.slurm.sacctmgr.get_slurm_sacct_active_jobs_for_user", return_value="testing"):
wdpypere marked this conversation as resolved.
Show resolved Hide resolved
(job_cancel_commands, commands, remove_user_commands) = slurm_user_accounts(vo_members, active_accounts, slurm_user_info, ["banette"])

self.assertEqual(set([tuple(x) for x in commands]), set([tuple(x) for x in [
shlex.split("/usr/bin/sacctmgr -i add user user6 Account=vo2 Cluster=banette DefaultAccount=vo2"),
Expand Down