From b795edae882c5693d50b9cf1d6eabb1900c13e27 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Mon, 17 Feb 2025 17:30:16 +0100 Subject: [PATCH] First version of renaming group paths and relabeling the Logger on group relabel. --- src/aiida/cmdline/commands/cmd_group.py | 7 ++ src/aiida/cmdline/commands/cmd_profile.py | 47 ++++++-- src/aiida/tools/dumping/logger.py | 45 ++++++-- src/aiida/tools/dumping/profile.py | 132 +++++++++++++++++----- tests/tools/dumping/test_profile.py | 6 + 5 files changed, 192 insertions(+), 45 deletions(-) diff --git a/src/aiida/cmdline/commands/cmd_group.py b/src/aiida/cmdline/commands/cmd_group.py index 4f340b386a..2ef96c0ea5 100644 --- a/src/aiida/cmdline/commands/cmd_group.py +++ b/src/aiida/cmdline/commands/cmd_group.py @@ -319,12 +319,19 @@ def _dry_run_callback(pks): @with_dbenv() def group_relabel(group, label): """Change the label of a group.""" + # TODO: Add a message here that if one has the profile mirrored, they should also run the command `verdi profile + # TODO: mirror relabel-group ` to update the mirrored profile. try: group.label = label except UniquenessError as exception: echo.echo_critical(str(exception)) else: echo.echo_success(f"Label changed to '{label}'") + msg = ( + 'Note that if you are mirroring your profile data to disk, to reflect the relabeling of the group, ' + 'run the command: `verdi profile mirror --update-groups.' + ) + echo.echo_report(msg) @verdi_group.command('description') diff --git a/src/aiida/cmdline/commands/cmd_profile.py b/src/aiida/cmdline/commands/cmd_profile.py index 55077dd594..07973a7d56 100644 --- a/src/aiida/cmdline/commands/cmd_profile.py +++ b/src/aiida/cmdline/commands/cmd_profile.py @@ -303,6 +303,14 @@ def profile_delete(force, delete_data, profiles): show_default=True, help='If a top-level process calls sub-processes, create a designated directory only for the top-level process.', ) +# TODO: Implement this... +# TODO: Possibly +@click.option( + '--update-groups/--no-update-groups', + default=False, + show_default=True, + help='Update directories if nodes have been added to other groups, or organized differently in terms of groups.', +) @options.INCLUDE_INPUTS() @options.INCLUDE_OUTPUTS() @options.INCLUDE_ATTRIBUTES() @@ -319,6 +327,7 @@ def profile_mirror( organize_by_groups, symlink_duplicates, delete_missing, + update_groups, only_top_level_calcs, only_top_level_workflows, include_inputs, @@ -343,6 +352,11 @@ def profile_mirror( # ? Does it even make sense to provide both options, as they are mutually exclusive? incremental = not overwrite + if not organize_by_groups and update_groups: + # Add check outside in cmd_profile? + msg = '`--update-groups` selected, even though `--organize-by-groups` is set to False.' + echo.echo_critical(msg) + if path is None: path = Path.cwd() / f'{profile.name}-mirror' @@ -421,6 +435,9 @@ def profile_mirror( num_processes_to_dump = len(profile_dumper.processes_to_dump) num_processes_to_delete = len(profile_dumper.processes_to_delete) + # num_groups_to_dump = len(profile_dumper.groups_to_dump) + num_groups_to_delete = len(profile_dumper.groups_to_delete) + if dry_run: dry_run_message = ( f'Dry run for mirroring of profile `{profile.name}`. ' @@ -438,23 +455,35 @@ def profile_mirror( # TODO: Maybe add y/n confirmation here? echo.echo_report(msg) - if num_processes_to_dump == 0: - msg = 'No processes to dump.' - echo.echo_success(msg) - else: - profile_dumper.dump_processes() - msg = f'Dumped {num_processes_to_dump} new nodes.' - echo.echo_success(msg) + if dump_processes: + if num_processes_to_dump == 0: + msg = 'No processes to dump.' + echo.echo_success(msg) + else: + profile_dumper.dump_processes() + msg = f'Dumped {num_processes_to_dump} new nodes.' + echo.echo_success(msg) if delete_missing: - # breakpoint() if num_processes_to_delete == 0: echo.echo_success('No processes to delete.') else: profile_dumper.delete_processes() - echo.echo_success(f'Deleted {num_processes_to_delete} node directories.') + if num_groups_to_delete == 0: + echo.echo_success('No groups to delete.') + else: + profile_dumper.delete_groups() + echo.echo_success(f'Deleted {num_groups_to_delete} group directories.') + + if update_groups: + relabeled_paths = profile_dumper.update_groups() + + msg = 'Mirrored directories and ' + echo.echo_success(msg) + print(relabeled_paths) + # Append the current dump time to dumping safeguard file with safeguard_file_path.open('a') as fhandle: fhandle.write(f'Last profile mirror time: {last_dump_time.isoformat()}\n') diff --git a/src/aiida/tools/dumping/logger.py b/src/aiida/tools/dumping/logger.py index 99f7deda6c..24adcbcaf3 100644 --- a/src/aiida/tools/dumping/logger.py +++ b/src/aiida/tools/dumping/logger.py @@ -15,6 +15,9 @@ from aiida.common.exceptions import NotExistent +# TODO: Possibly mirror hierarchy of mirrored directory inside json file +# TODO: Currently, json file has only top-level "groups", "workflows", and "calculations" + @dataclass class DumpLog: @@ -94,10 +97,12 @@ class DumpLogStoreCollection: calculations: DumpLogStore workflows: DumpLogStore + groups: DumpLogStore + # data: DumpLogStore class DumpLogger: - """Main logger class using dataclasses for better structure.""" + """Main dumping logger singleton.""" DUMP_LOG_FILE: str = '.dump_log.json' _instance: 'DumpLogger | None' = None # Class-level singleton instance @@ -116,7 +121,8 @@ def __init__( dump_parent_path: Path | None = None, calculations: DumpLogStore | None = None, workflows: DumpLogStore | None = None, - # counter: int = 0, + groups: DumpLogStore | None = None, + # data: DumpLogStore | None = None, ) -> None: # Ensure __init__ is only called once if hasattr(self, '_initialized') and self._initialized: @@ -125,7 +131,8 @@ def __init__( self.dump_parent_path = dump_parent_path or Path.cwd() self.calculations = calculations or DumpLogStore() self.workflows = workflows or DumpLogStore() - # self.counter = counter + self.groups = groups or DumpLogStore() + # self.dat = data or DumpLogStore() # Mark the object as initialized self._initialized = True @@ -144,7 +151,7 @@ def del_entry(self, store: DumpLogStore, uuid: str) -> bool: @property def log(self) -> DumpLogStoreCollection: """Retrieve the current state of the log as a dataclass.""" - return DumpLogStoreCollection(calculations=self.calculations, workflows=self.workflows) + return DumpLogStoreCollection(calculations=self.calculations, workflows=self.workflows, groups=self.groups) def save_log(self) -> None: """Save the log to a JSON file.""" @@ -152,12 +159,18 @@ def save_log(self) -> None: def serialize_logs(container: DumpLogStore) -> dict: serialized = {} for uuid, entry in container.entries.items(): - serialized[uuid] = {'path': str(entry.path), 'time': entry.time.isoformat()} + serialized[uuid] = { + 'path': str(entry.path), + 'time': entry.time.isoformat(), + 'links': [str(link) for link in entry.links], + } return serialized log_dict = { 'calculations': serialize_logs(self.calculations), 'workflows': serialize_logs(self.workflows), + 'groups': serialize_logs(self.groups), + # 'data': serialize_logs(self.data), } with self.log_file_path.open('w', encoding='utf-8') as f: @@ -185,12 +198,20 @@ def deserialize_logs(category_data: dict) -> DumpLogStore: container = DumpLogStore() for uuid, entry in category_data.items(): container.add_entry( - uuid, DumpLog(path=Path(entry['path']), time=datetime.fromisoformat(entry['time'])) + uuid, + DumpLog( + path=Path(entry['path']), + time=datetime.fromisoformat(entry['time']), + links=[Path(p) for p in entry['links']], + ), ) + return container instance.calculations = deserialize_logs(data['calculations']) instance.workflows = deserialize_logs(data['workflows']) + instance.groups = deserialize_logs(data['groups']) + # instance.data = deserialize_logs(data['data']) except (json.JSONDecodeError, OSError): raise @@ -206,7 +227,7 @@ def get_store_by_uuid(self, uuid: str) -> DumpLogStore: if uuid in store.entries: return store - msg = f"No corresponding `DumpLogStore` found for UUID: `{uuid}`." + msg = f'No corresponding `DumpLogStore` found for UUID: `{uuid}`.' raise NotExistent(msg) def get_path_by_uuid(self, uuid: str) -> Path | None: @@ -215,13 +236,17 @@ def get_path_by_uuid(self, uuid: str) -> Path | None: try: current_store = self.get_store_by_uuid(uuid=uuid) - path = current_store.entries[uuid].path - return path except NotExistent as exc: raise NotExistent(exc.args[0]) from exc + try: + path = current_store.entries[uuid].path + return path except KeyError as exc: - msg = f"UUID: `{uuid}` not contained in store `{current_store}`." + msg = f'UUID: `{uuid}` not contained in store `{current_store}`.' raise KeyError(msg) from exc except: # For debugging + import ipdb + + ipdb.set_trace() raise diff --git a/src/aiida/tools/dumping/profile.py b/src/aiida/tools/dumping/profile.py index 945b199e93..c63c719e81 100644 --- a/src/aiida/tools/dumping/profile.py +++ b/src/aiida/tools/dumping/profile.py @@ -12,8 +12,9 @@ from __future__ import annotations -from typing import cast +from datetime import datetime from pathlib import Path +from typing import cast from aiida import orm from aiida.common.log import AIIDA_LOGGER @@ -22,9 +23,9 @@ from aiida.tools.dumping.base import BaseDumper from aiida.tools.dumping.collection import CollectionDumper from aiida.tools.dumping.config import BaseDumpConfig, ProfileDumpConfig -from aiida.tools.dumping.logger import DumpLogger +from aiida.tools.dumping.logger import DumpLog, DumpLogger from aiida.tools.dumping.process import ProcessDumper -from aiida.tools.dumping.utils import filter_nodes_last_dump_time, load_given_group, safe_delete_dir, get_group_subpath +from aiida.tools.dumping.utils import filter_nodes_last_dump_time, get_group_subpath, load_given_group, safe_delete_dir logger = AIIDA_LOGGER.getChild('tools.dumping') @@ -54,7 +55,7 @@ def __init__( super().__init__(base_dump_config=base_dump_config, dump_logger=dump_logger) if groups is not None: - self.groups = [load_given_group(g) for g in groups] + self.groups = [load_given_group(group=g) for g in groups] else: self.groups = [] @@ -76,6 +77,9 @@ def __init__( self._processes_to_dump: list[str] | None = None self._processes_to_delete: list[str] | None = None + # self._groups_to_dump: list[str] | None = None + self._groups_to_delete: list[str] | None = None + def _dump_processes_not_in_any_group(self) -> None: """Dump the profile's process data not contained in any group.""" @@ -105,6 +109,8 @@ def _dump_processes_per_group(self, groups: list[orm.Group]) -> None: :param groups: List of ``orm.Group`` entities. """ + group_store = self.dump_logger.groups + for group in groups: if self.organize_by_groups: group_subpath = get_group_subpath(group=group) @@ -125,6 +131,11 @@ def _dump_processes_per_group(self, groups: list[orm.Group]) -> None: logger.report(msg) group_dumper.dump(output_path=output_path) + group_store.add_entry( + uuid=group.uuid, + entry=DumpLog(path=output_path, time=datetime.now().astimezone()), + ) + def _get_no_group_processes(self) -> list[str]: """Obtain nodes in the profile that are not part of any group. @@ -166,11 +177,11 @@ def dump_processes(self): # Still, even without selecting groups, by default, all profile data should be dumped # Thus, we obtain all groups in the profile here - profile_groups = orm.QueryBuilder().append(orm.Group).all(flat=True) + profile_groups = cast(list[orm.Group], orm.QueryBuilder().append(orm.Group).all(flat=True)) self._dump_processes_per_group(groups=profile_groups) else: - self._dump_processes_per_group(groups=self.groups) + self._dump_processes_per_group(groups=[g for g in self.groups if g is not None]) @property def processes_to_dump(self) -> list[str]: @@ -190,6 +201,28 @@ def _get_processes_to_dump(self) -> list[str]: return profile_processes + @property + def groups_to_delete(self) -> list[str]: + if not self.delete_missing: + return [] + if self._groups_to_delete is None: + self._groups_to_delete = self._get_groups_to_delete() + return self._groups_to_delete + + def _get_groups_to_delete(self) -> list[str]: + breakpoint() + dump_logger = self.dump_logger + log = dump_logger.log + + # Cannot use QB here because, when node deleted, it's not in the DB anymore + dumped_uuids = set(list(log.groups.entries.keys())) + + profile_uuids = cast(set[str], set(orm.QueryBuilder().append(orm.Group, project=['uuid']).all(flat=True))) + + to_delete_uuids = list(dumped_uuids - profile_uuids) + + return to_delete_uuids + @property def processes_to_delete(self) -> list[str]: if not self.delete_missing: @@ -205,23 +238,19 @@ def _get_processes_to_delete(self) -> list[str]: # Cannot use QB here because, when node deleted, it's not in the DB anymore dumped_uuids = set(list(log.calculations.entries.keys()) + list(log.workflows.entries.keys())) - breakpoint() - - # TODO: Possibly filter here since last dump time - # TODO: But it is highly likely that the last dump command with deletion was run a while ago - # TODO: So I cannot filter by last dump time, but should probably take the whole set - # TODO: Don't - profile_uuids = cast( - set[str], set(orm.QueryBuilder().append(orm.ProcessNode, project=['uuid']).all(flat=True)) - ) + # One could possibly filter here since last dump time, however + # it is highly likely that the last dump command with deletion was run a while ago + # So I cannot filter by last dump time, but should probably take the whole set + profile_uuids = cast(set[str], set(orm.QueryBuilder().append(orm.ProcessNode, project=['uuid']).all(flat=True))) + # Don't restrict here to only top-level processes, as all file paths, also for sub-processes are actually + # created and stored in the log # profile_uuids = set([process.uuid for process in profile_processes if process.caller is None]) - # profile_uuids = set([process.uuid for process in profile_processes]) to_delete_uuids = list(dumped_uuids - profile_uuids) return to_delete_uuids - def _delete_missing_node_dir(self, to_delete_uuid) -> None: + def _delete_missing_node_dir(self, to_delete_uuid: str) -> None: # TODO: Possibly make a delete method for the path and the log, and then call that in the loop dump_logger = self.dump_logger @@ -234,15 +263,17 @@ def _delete_missing_node_dir(self, to_delete_uuid) -> None: # breakpoint() path_to_delete = dump_logger.get_path_by_uuid(uuid=to_delete_uuid) - - try: - safe_delete_dir(path_to_validate=path_to_delete, safeguard_file='.aiida_node_metadata.yaml', verbose=False) - current_store.del_entry(uuid=to_delete_uuid) - except: - raise + if path_to_delete is not None: + try: + safe_delete_dir( + path_to_validate=path_to_delete, safeguard_file='.aiida_node_metadata.yaml', verbose=False + ) + current_store.del_entry(uuid=to_delete_uuid) + except: + raise def delete_processes(self): - to_dump_processes = self.processes_to_dump + # to_dump_processes = self.processes_to_dump to_delete_processes = self.processes_to_delete # print(f'TO_DUMP_PROCESSES: {to_dump_processes}') @@ -252,5 +283,54 @@ def delete_processes(self): for to_delete_uuid in to_delete_processes: self._delete_missing_node_dir(to_delete_uuid=to_delete_uuid) - # TODO: Need to also delete entry from the log when I delete the dir - # TODO: Add also logging for node/path deletion + # TODO: Add also logging for node/path deletion? + + def delete_groups(self): + to_delete_groups = self.groups_to_delete + for to_delete_uuid in to_delete_groups: + self._delete_missing_node_dir(to_delete_uuid=to_delete_uuid) + # ! Problem: Don't have safeguard file in empty group directory + + def update_groups(self) -> list[dict[str, Path]]: + dump_logger = self.dump_logger + + # Order is the same as in the mirroring log file -> Not using a profile QB here + # Also, if the group is new (and contains new nodes), it will be dumped anyway + dumped_group_uuids = list(dump_logger.groups.entries.keys()) + + old_mapping: dict[str, Path] = dict( + zip(dumped_group_uuids, [p.path for p in dump_logger.groups.entries.values()]) + ) + + new_mapping: dict[str, Path] = dict( + zip( + dumped_group_uuids, + [self.dump_parent_path / 'groups' / get_group_subpath(orm.load_group(g)) for g in dumped_group_uuids], + ) + ) + + modified_paths: list[dict[str, Path]] = [] + + for uuid, old_path in old_mapping.items(): + new_path = new_mapping.get(uuid) + + if new_path and old_path != new_path: + # logger.report(f'Renaming {old_path} -> {new_path}') + old_path.rename(new_path) + try: + dump_logger.groups.entries[uuid].path = new_path + except: + # For debugging + # import ipdb + + # ipdb.set_trace() + raise + + modified_paths.append( + { + 'old': old_path, + 'new': new_path, + } + ) + + return modified_paths diff --git a/tests/tools/dumping/test_profile.py b/tests/tools/dumping/test_profile.py index 574688d8ae..01e765ef43 100644 --- a/tests/tools/dumping/test_profile.py +++ b/tests/tools/dumping/test_profile.py @@ -7,3 +7,9 @@ # For further information please visit http://www.aiida.net # ########################################################################### """Tests for the dumping of profile data to disk.""" + + +def test_delete_missing_group_nodes_retained(): ... + + +def test_delete_missing_group_nodes_deleted(): ...