Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add first unit tests for process_pool_worker #3766

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 119 additions & 50 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,63 +809,132 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_
return logger


if __name__ == "__main__":

parser = argparse.ArgumentParser()
parser.add_argument("-d", "--debug", action='store_true',
help="Enable logging at DEBUG level")
parser.add_argument("-a", "--addresses", default='',
help="Comma separated list of addresses at which the interchange could be reached")
parser.add_argument("--cert_dir", required=True,
help="Path to certificate directory.")
parser.add_argument("-l", "--logdir", default="process_worker_pool_logs",
help="Process worker pool log directory")
parser.add_argument("-u", "--uid", default=str(uuid.uuid4()).split('-')[-1],
help="Unique identifier string for Manager")
parser.add_argument("-b", "--block_id", default=None,
help="Block identifier for Manager")
parser.add_argument("-c", "--cores_per_worker", default="1.0",
help="Number of cores assigned to each worker process. Default=1.0")
parser.add_argument("-m", "--mem_per_worker", default=0,
help="GB of memory assigned to each worker process. Default=0, no assignment")
parser.add_argument("-t", "--task_port", required=True,
help="REQUIRED: Task port for receiving tasks from the interchange")
parser.add_argument("--max_workers_per_node", default=float('inf'),
help="Caps the maximum workers that can be launched, default:infinity")
parser.add_argument("-p", "--prefetch_capacity", default=0,
help="Number of tasks that can be prefetched to the manager. Default is 0.")
parser.add_argument("--hb_period", default=30,
help="Heartbeat period in seconds. Uses manager default unless set")
parser.add_argument("--hb_threshold", default=120,
help="Heartbeat threshold in seconds. Uses manager default unless set")
parser.add_argument("--drain_period", default=None,
help="Drain this pool after specified number of seconds. By default, does not drain.")
parser.add_argument("--address_probe_timeout", default=30,
help="Timeout to probe for viable address to interchange. Default: 30s")
parser.add_argument("--poll", default=10,
help="Poll period used in milliseconds")
parser.add_argument("-r", "--result_port", required=True,
help="REQUIRED: Result port for posting results to the interchange")
def get_arg_parser() -> argparse.ArgumentParser:

def strategyorlist(s: str):
allowed_strategies = ["none", "block", "alternating", "block-reverse"]
s = s.lower()
khk-globus marked this conversation as resolved.
Show resolved Hide resolved
allowed_strategies = ("none", "block", "alternating", "block-reverse")
if s in allowed_strategies:
return s
elif s[0:4] == "list":
return s
else:
raise argparse.ArgumentTypeError("cpu-affinity must be one of {} or a list format".format(allowed_strategies))

parser.add_argument("--cpu-affinity", type=strategyorlist,
required=True,
help="Whether/how workers should control CPU affinity.")
parser.add_argument("--available-accelerators", type=str, nargs="*",
help="Names of available accelerators, if not given assumed to be zero accelerators available", default=[])
parser.add_argument("--enable_mpi_mode", action='store_true',
help="Enable MPI mode")
parser.add_argument("--mpi-launcher", type=str, choices=VALID_LAUNCHERS,
help="MPI launcher to use iff enable_mpi_mode=true")
err_msg = f"cpu-affinity must be one of {allowed_strategies} or a list format"
raise argparse.ArgumentTypeError(err_msg)

parser = argparse.ArgumentParser()
parser.add_argument(
"-d", "--debug", action='store_true', help="Enable logging at DEBUG level",
)
parser.add_argument(
"-a",
"--addresses",
default='',
help="Comma separated list of addresses at which the interchange could be reached",
)
parser.add_argument(
"--cert_dir", required=True, help="Path to certificate directory."
)
parser.add_argument(
"-l",
"--logdir",
default="process_worker_pool_logs",
help="Process worker pool log directory",
)
parser.add_argument(
"-u",
"--uid",
default=str(uuid.uuid4()).split('-')[-1],
help="Unique identifier string for Manager",
)
parser.add_argument(
"-b", "--block_id", default=None, help="Block identifier for Manager"
)
parser.add_argument(
"-c",
"--cores_per_worker",
default="1.0",
help="Number of cores assigned to each worker process. Default=1.0",
)
parser.add_argument(
"-m",
"--mem_per_worker",
default=0,
help="GB of memory assigned to each worker process. Default=0, no assignment",
)
parser.add_argument(
"-t",
"--task_port",
required=True,
help="Task port for receiving tasks from the interchange",
)
parser.add_argument(
"--max_workers_per_node",
default=float('inf'),
help="Caps the maximum workers that can be launched, default:infinity",
)
parser.add_argument(
"-p",
"--prefetch_capacity",
default=0,
help="Number of tasks that can be prefetched to the manager. Default is 0.",
)
parser.add_argument(
"--hb_period",
default=30,
help="Heartbeat period in seconds. Uses manager default unless set",
)
parser.add_argument(
"--hb_threshold",
default=120,
help="Heartbeat threshold in seconds. Uses manager default unless set",
)
parser.add_argument(
"--drain_period",
default=None,
help="Drain this pool after specified number of seconds. By default, does not drain.",
)
parser.add_argument(
"--address_probe_timeout",
default=30,
help="Timeout to probe for viable address to interchange. Default: 30s",
)
parser.add_argument(
"--poll", default=10, help="Poll period used in milliseconds"
)
parser.add_argument(
"-r",
"--result_port",
required=True,
help="Result port for posting results to the interchange",
)
parser.add_argument(
"--cpu-affinity",
type=strategyorlist,
required=True,
help="Whether/how workers should control CPU affinity.",
)
parser.add_argument(
"--available-accelerators",
type=str,
nargs="*",
default=[],
help="Names of available accelerators, if not given assumed to be zero accelerators available",
)
parser.add_argument(
"--enable_mpi_mode", action='store_true', help="Enable MPI mode"
)
parser.add_argument(
"--mpi-launcher",
type=str,
choices=VALID_LAUNCHERS,
help="MPI launcher to use iff enable_mpi_mode=true",
)

return parser


if __name__ == "__main__":
parser = get_arg_parser()
args = parser.parse_args()

os.makedirs(os.path.join(args.logdir, "block-{}".format(args.block_id), args.uid), exist_ok=True)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import sys
from argparse import ArgumentError

import pytest

from parsl.executors.high_throughput import process_worker_pool

if sys.version_info < (3, 12):
# exit_on_error bug; see https://github.com/python/cpython/issues/121018
# "argparse.ArgumentParser.parses_args does not honor exit_on_error=False when
# given unrecognized arguments"
pytest.skip(allow_module_level=True, reason="exit_on_error argparse bug")

# due to above pytest.skip, mypy on < Py312 thinks this is unreachable. :facepalm:
_known_required = ( # type: ignore[unreachable, unused-ignore]
"--cert_dir",
"--cpu-affinity",
"--result_port",
"--task_port",
)


@pytest.mark.local
def test_arg_parser_exits_on_error():
p = process_worker_pool.get_arg_parser()
assert p.exit_on_error


@pytest.mark.local
def test_arg_parser_known_required():
p = process_worker_pool.get_arg_parser()
reqd = [a for a in p._actions if a.required]
for a in reqd:
assert a.option_strings[-1] in _known_required, "Update _known_required?"


@pytest.mark.local
@pytest.mark.parametrize("req", _known_required)
def test_arg_parser_required(req):
p = process_worker_pool.get_arg_parser()
p.exit_on_error = False
with pytest.raises(ArgumentError) as pyt_exc:
p.parse_args([])

e_msg = pyt_exc.value.args[1]
assert req in e_msg


@pytest.mark.local
@pytest.mark.parametrize("valid,val", (
(True, "NoNe"),
(True, "none"),
(True, "block"),
(True, "alternating"),
(True, "block-reverse"),
(True, "list"),
(False, "asdf"),
(False, ""),
))
def test_arg_parser_validates_cpu_affinity(valid, val):
reqd_args = []
reqd_args.extend(("--cert_dir", "/some/path"))
reqd_args.extend(("--result_port", "123"))
reqd_args.extend(("--task_port", "123"))
reqd_args.extend(("--cpu-affinity", val))

p = process_worker_pool.get_arg_parser()
p.exit_on_error = False
if valid:
p.parse_args(reqd_args)
else:
with pytest.raises(ArgumentError) as pyt_exc:
p.parse_args(reqd_args)
assert "must be one of" in pyt_exc.value.args[1]
Loading