Skip to content

Commit

Permalink
Add first unit tests for process_pool_worker
Browse files Browse the repository at this point in the history
The only change of note is to move the ArgumentParser to a function to enable
writing unit tests for it.  No user-functional change.
  • Loading branch information
khk-globus committed Feb 12, 2025
1 parent e724d13 commit eb03e75
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 50 deletions.
169 changes: 119 additions & 50 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,63 +809,132 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_
return logger


if __name__ == "__main__":

parser = argparse.ArgumentParser()
parser.add_argument("-d", "--debug", action='store_true',
help="Enable logging at DEBUG level")
parser.add_argument("-a", "--addresses", default='',
help="Comma separated list of addresses at which the interchange could be reached")
parser.add_argument("--cert_dir", required=True,
help="Path to certificate directory.")
parser.add_argument("-l", "--logdir", default="process_worker_pool_logs",
help="Process worker pool log directory")
parser.add_argument("-u", "--uid", default=str(uuid.uuid4()).split('-')[-1],
help="Unique identifier string for Manager")
parser.add_argument("-b", "--block_id", default=None,
help="Block identifier for Manager")
parser.add_argument("-c", "--cores_per_worker", default="1.0",
help="Number of cores assigned to each worker process. Default=1.0")
parser.add_argument("-m", "--mem_per_worker", default=0,
help="GB of memory assigned to each worker process. Default=0, no assignment")
parser.add_argument("-t", "--task_port", required=True,
help="REQUIRED: Task port for receiving tasks from the interchange")
parser.add_argument("--max_workers_per_node", default=float('inf'),
help="Caps the maximum workers that can be launched, default:infinity")
parser.add_argument("-p", "--prefetch_capacity", default=0,
help="Number of tasks that can be prefetched to the manager. Default is 0.")
parser.add_argument("--hb_period", default=30,
help="Heartbeat period in seconds. Uses manager default unless set")
parser.add_argument("--hb_threshold", default=120,
help="Heartbeat threshold in seconds. Uses manager default unless set")
parser.add_argument("--drain_period", default=None,
help="Drain this pool after specified number of seconds. By default, does not drain.")
parser.add_argument("--address_probe_timeout", default=30,
help="Timeout to probe for viable address to interchange. Default: 30s")
parser.add_argument("--poll", default=10,
help="Poll period used in milliseconds")
parser.add_argument("-r", "--result_port", required=True,
help="REQUIRED: Result port for posting results to the interchange")
def get_arg_parser() -> argparse.ArgumentParser:

def strategyorlist(s: str):
allowed_strategies = ["none", "block", "alternating", "block-reverse"]
s = s.lower()
allowed_strategies = ("none", "block", "alternating", "block-reverse")
if s in allowed_strategies:
return s
elif s[0:4] == "list":
return s
else:
raise argparse.ArgumentTypeError("cpu-affinity must be one of {} or a list format".format(allowed_strategies))

parser.add_argument("--cpu-affinity", type=strategyorlist,
required=True,
help="Whether/how workers should control CPU affinity.")
parser.add_argument("--available-accelerators", type=str, nargs="*",
help="Names of available accelerators, if not given assumed to be zero accelerators available", default=[])
parser.add_argument("--enable_mpi_mode", action='store_true',
help="Enable MPI mode")
parser.add_argument("--mpi-launcher", type=str, choices=VALID_LAUNCHERS,
help="MPI launcher to use iff enable_mpi_mode=true")
err_msg = f"cpu-affinity must be one of {allowed_strategies} or a list format"
raise argparse.ArgumentTypeError(err_msg)

parser = argparse.ArgumentParser()
parser.add_argument(
"-d", "--debug", action='store_true', help="Enable logging at DEBUG level",
)
parser.add_argument(
"-a",
"--addresses",
default='',
help="Comma separated list of addresses at which the interchange could be reached",
)
parser.add_argument(
"--cert_dir", required=True, help="Path to certificate directory."
)
parser.add_argument(
"-l",
"--logdir",
default="process_worker_pool_logs",
help="Process worker pool log directory",
)
parser.add_argument(
"-u",
"--uid",
default=str(uuid.uuid4()).split('-')[-1],
help="Unique identifier string for Manager",
)
parser.add_argument(
"-b", "--block_id", default=None, help="Block identifier for Manager"
)
parser.add_argument(
"-c",
"--cores_per_worker",
default="1.0",
help="Number of cores assigned to each worker process. Default=1.0",
)
parser.add_argument(
"-m",
"--mem_per_worker",
default=0,
help="GB of memory assigned to each worker process. Default=0, no assignment",
)
parser.add_argument(
"-t",
"--task_port",
required=True,
help="Task port for receiving tasks from the interchange",
)
parser.add_argument(
"--max_workers_per_node",
default=float('inf'),
help="Caps the maximum workers that can be launched, default:infinity",
)
parser.add_argument(
"-p",
"--prefetch_capacity",
default=0,
help="Number of tasks that can be prefetched to the manager. Default is 0.",
)
parser.add_argument(
"--hb_period",
default=30,
help="Heartbeat period in seconds. Uses manager default unless set",
)
parser.add_argument(
"--hb_threshold",
default=120,
help="Heartbeat threshold in seconds. Uses manager default unless set",
)
parser.add_argument(
"--drain_period",
default=None,
help="Drain this pool after specified number of seconds. By default, does not drain.",
)
parser.add_argument(
"--address_probe_timeout",
default=30,
help="Timeout to probe for viable address to interchange. Default: 30s",
)
parser.add_argument(
"--poll", default=10, help="Poll period used in milliseconds"
)
parser.add_argument(
"-r",
"--result_port",
required=True,
help="Result port for posting results to the interchange",
)
parser.add_argument(
"--cpu-affinity",
type=strategyorlist,
required=True,
help="Whether/how workers should control CPU affinity.",
)
parser.add_argument(
"--available-accelerators",
type=str,
nargs="*",
default=[],
help="Names of available accelerators, if not given assumed to be zero accelerators available",
)
parser.add_argument(
"--enable_mpi_mode", action='store_true', help="Enable MPI mode"
)
parser.add_argument(
"--mpi-launcher",
type=str,
choices=VALID_LAUNCHERS,
help="MPI launcher to use iff enable_mpi_mode=true",
)

return parser


if __name__ == "__main__":
parser = get_arg_parser()
args = parser.parse_args()

os.makedirs(os.path.join(args.logdir, "block-{}".format(args.block_id), args.uid), exist_ok=True)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import sys
from argparse import ArgumentError

import pytest

from parsl.executors.high_throughput import process_worker_pool

if sys.version_info < (3, 12):
# exit_on_error bug; see https://github.com/python/cpython/issues/121018
# "argparse.ArgumentParser.parses_args does not honor exit_on_error=False when
# given unrecognized arguments"
pytest.skip(allow_module_level=True, reason="exit_on_error argparse bug")

# due to above pytest.skip, mypy on < Py312 thinks this is unreachable. :facepalm:
_known_required = ( # type: ignore[unreachable, unused-ignore]
"--cert_dir",
"--cpu-affinity",
"--result_port",
"--task_port",
)


@pytest.mark.local
def test_arg_parser_exits_on_error():
p = process_worker_pool.get_arg_parser()
assert p.exit_on_error


@pytest.mark.local
def test_arg_parser_known_required():
p = process_worker_pool.get_arg_parser()
reqd = [a for a in p._actions if a.required]
for a in reqd:
assert a.option_strings[-1] in _known_required, "Update _known_required?"


@pytest.mark.local
@pytest.mark.parametrize("req", _known_required)
def test_arg_parser_required(req):
p = process_worker_pool.get_arg_parser()
p.exit_on_error = False
with pytest.raises(ArgumentError) as pyt_exc:
p.parse_args([])

e_msg = pyt_exc.value.args[1]
assert req in e_msg


@pytest.mark.local
@pytest.mark.parametrize("valid,val", (
(True, "NoNe"),
(True, "none"),
(True, "block"),
(True, "alternating"),
(True, "block-reverse"),
(True, "list"),
(False, "asdf"),
(False, ""),
))
def test_arg_parser_validates_cpu_affinity(valid, val):
reqd_args = []
reqd_args.extend(("--cert_dir", "/some/path"))
reqd_args.extend(("--result_port", "123"))
reqd_args.extend(("--task_port", "123"))
reqd_args.extend(("--cpu-affinity", val))

p = process_worker_pool.get_arg_parser()
p.exit_on_error = False
if valid:
p.parse_args(reqd_args)
else:
with pytest.raises(ArgumentError) as pyt_exc:
p.parse_args(reqd_args)
assert "must be one of" in pyt_exc.value.args[1]

0 comments on commit eb03e75

Please sign in to comment.