Skip to content

Commit

Permalink
Add command to get installed packages on managers
Browse files Browse the repository at this point in the history
Added a `MANAGERS_PACKAGES` HTEX interchange command and associated HTEX
method to retrieve a dict mapping each manager ID to a dict of installed
packages and their versions. This is useful for debugging and monitoring
worker environments.

We gather the package information from the manager upon registration,
but do not include it in the standard `MANAGERS` command, which runs
every 5 seconds. This avoids unnecessary overhead.
  • Loading branch information
rjmello committed Feb 14, 2025
1 parent 91c9b48 commit 668aa6d
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 1 deletion.
6 changes: 6 additions & 0 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,12 @@ def connected_managers(self) -> List[Dict[str, typing.Any]]:
"""
return self.command_client.run("MANAGERS")

def connected_managers_packages(self) -> Dict[str, Dict[str, str]]:
"""Returns a dict mapping each manager ID to a dict of installed
packages and their versions
"""
return self.command_client.run("MANAGERS_PACKAGES")

def connected_blocks(self) -> List[str]:
"""List of connected block ids"""
return self.command_client.run("CONNECTED_BLOCKS")
Expand Down
7 changes: 7 additions & 0 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,13 @@ def process_command(self, monitoring_radio: Optional[MonitoringRadioSender]) ->
'draining': m['draining']}
reply.append(resp)

elif command_req == "MANAGERS_PACKAGES":
reply = {}
for manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
manager_id_str = manager_id.decode('utf-8')
reply[manager_id_str] = m["packages"]

elif command_req.startswith("HOLD_WORKER"):
cmd, s_manager = command_req.split(';')
manager_id = s_manager.encode('utf-8')
Expand Down
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/manager_record.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, List, Optional
from typing import Any, Dict, List, Optional

from typing_extensions import TypedDict

Expand All @@ -18,3 +18,4 @@ class ManagerRecord(TypedDict, total=False):
timestamp: datetime
parsl_version: str
python_version: str
packages: Dict[str, str]
2 changes: 2 additions & 0 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import threading
import time
import uuid
from importlib.metadata import distributions
from multiprocessing.managers import DictProxy
from multiprocessing.sharedctypes import Synchronized
from typing import Dict, List, Optional, Sequence
Expand Down Expand Up @@ -265,6 +266,7 @@ def create_reg_message(self):
'python_v': "{}.{}.{}".format(sys.version_info.major,
sys.version_info.minor,
sys.version_info.micro),
'packages': {dist.metadata['Name']: dist.version for dist in distributions()},
'worker_count': self.worker_count,
'uid': self.uid,
'block_id': self.block_id,
Expand Down
17 changes: 17 additions & 0 deletions parsl/tests/test_htex/test_managers_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import parsl
from parsl.app.app import python_app
from parsl.tests.configs.htex_local import fresh_config
from parsl.version import VERSION as PARSL_VERSION


def local_config():
Expand Down Expand Up @@ -34,3 +35,19 @@ def test_connected_managers():
assert 'parsl_version' in manager_info
assert manager_info['parsl_version'] == parsl.__version__
assert manager_info['python_version'] == f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"


@pytest.mark.local
def test_connected_managers_packages():
# Run dummy function to ensure a manager is online
f = dummy()
assert f.result() is None

htex: parsl.HighThroughputExecutor = parsl.dfk().executors['htex_local']
managers_info_list = htex.connected_managers()
managers_packages = htex.connected_managers_packages()

assert len(managers_packages) == len(managers_info_list) == 1
manager_id, packages = list(managers_packages.items())[0]
assert manager_id == managers_info_list[0]['manager']
assert packages['parsl'] == PARSL_VERSION

0 comments on commit 668aa6d

Please sign in to comment.