Skip to content

Commit

Permalink
SqliteDosStorage: Implement the storage backup
Browse files Browse the repository at this point in the history
  • Loading branch information
sphuber committed May 3, 2024
1 parent 6b6e152 commit de0ed6b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 27 deletions.
9 changes: 9 additions & 0 deletions src/aiida/orm/implementation/storage_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,18 @@ def backup(
:raises StorageBackupError: If an error occurred during the backup procedure.
:raises NotImplementedError: If the storage backend doesn't implement a backup procedure.
"""
from aiida.common.exceptions import LockedProfileError, StorageBackupError
from aiida.manage.configuration.settings import DEFAULT_CONFIG_FILE_NAME
from aiida.manage.profile_access import ProfileAccessManager
from aiida.storage.log import STORAGE_LOGGER

# check that the AiiDA profile is not locked and request access for the duration of this backup process
# (locked means that possibly a maintenance operation is running that could interfere with the backup)
try:
ProfileAccessManager(self._profile).request_access()
except LockedProfileError as exc:
raise StorageBackupError(f'{self._profile} is locked!') from exc

backup_manager = self._validate_or_init_backup_folder(dest, keep)

try:
Expand Down
9 changes: 0 additions & 9 deletions src/aiida/storage/psql_dos/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,6 @@ def _backup_storage(
import subprocess
import tempfile

from aiida.manage.profile_access import ProfileAccessManager

STORAGE_LOGGER.report('Starting backup...')

# This command calls `rsync` and `pg_dump` executables. check that they are in PATH
Expand All @@ -518,13 +516,6 @@ def _backup_storage(
cfg = self._profile.storage_config
container = Container(get_filepath_container(self.profile))

# check that the AiiDA profile is not locked and request access for the duration of this backup process
# (locked means that possibly a maintenance operation is running that could interfere with the backup)
try:
ProfileAccessManager(self._profile).request_access()
except exceptions.LockedProfileError as exc:
raise exceptions.StorageBackupError('The profile is locked!') from exc

# step 1: first run the storage maintenance version that can safely be performed while aiida is running
STORAGE_LOGGER.report('Running basic maintenance...')
self.maintain(full=False, compress=False)
Expand Down
78 changes: 60 additions & 18 deletions src/aiida/storage/sqlite_dos/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
from typing import TYPE_CHECKING, Optional
from uuid import uuid4

from disk_objectstore import Container
from disk_objectstore import Container, backup_utils
from pydantic import BaseModel, Field, field_validator
from sqlalchemy import insert
from sqlalchemy.orm import scoped_session, sessionmaker

from aiida.common import exceptions
from aiida.common.log import AIIDA_LOGGER
from aiida.manage import Profile
from aiida.manage.configuration.settings import AIIDA_CONFIG_FOLDER
Expand All @@ -40,6 +41,8 @@
__all__ = ('SqliteDosStorage',)

LOGGER = AIIDA_LOGGER.getChild(__file__)
FILENAME_DATABASE = 'database.sqlite'
FILENAME_CONTAINER = 'container'


class SqliteDosMigrator(PsqlDosMigrator):
Expand All @@ -52,7 +55,7 @@ class SqliteDosMigrator(PsqlDosMigrator):
"""

def __init__(self, profile: Profile) -> None:
filepath_database = Path(profile.storage_config['filepath']) / 'database.sqlite'
filepath_database = Path(profile.storage_config['filepath']) / FILENAME_DATABASE
filepath_database.touch()

self.profile = profile
Expand All @@ -64,7 +67,7 @@ def get_container(self) -> Container:
:returns: The disk-object store container configured for the repository path of the current profile.
"""
filepath_container = Path(self.profile.storage_config['filepath']) / 'container'
filepath_container = Path(self.profile.storage_config['filepath']) / FILENAME_CONTAINER
return Container(str(filepath_container))

def initialise_database(self) -> None:
Expand Down Expand Up @@ -112,6 +115,18 @@ def filepath_is_absolute(cls, value: str) -> str:
"""Return the resolved and absolute filepath."""
return str(Path(value).resolve().absolute())

@property
def filepath_root(self) -> Path:
return Path(self.profile.storage_config['filepath'])

@property
def filepath_container(self) -> Path:
return self.filepath_root / FILENAME_CONTAINER

@property
def filepath_database(self) -> Path:
return self.filepath_root / FILENAME_DATABASE

@classmethod
def initialise(cls, profile: Profile, reset: bool = False) -> bool:
filepath = Path(profile.storage_config['filepath'])
Expand All @@ -132,7 +147,7 @@ def initialise(cls, profile: Profile, reset: bool = False) -> bool:

def __str__(self) -> str:
state = 'closed' if self.is_closed else 'open'
return f'SqliteDosStorage[{self._profile.storage_config["filepath"]}]: {state},'
return f'SqliteDosStorage[{self.filepath_root}]: {state},'

def _initialise_session(self):
"""Initialise the SQLAlchemy session factory.
Expand All @@ -144,28 +159,22 @@ def _initialise_session(self):
Multi-thread support is currently required by the REST API.
Although, in the future, we may want to move the multi-thread handling to higher in the AiiDA stack.
"""
engine = create_sqla_engine(Path(self._profile.storage_config['filepath']) / 'database.sqlite')
engine = create_sqla_engine(self.filepath_database)
self._session_factory = scoped_session(sessionmaker(bind=engine, future=True, expire_on_commit=True))

def _backup(
self,
dest: str,
keep: Optional[int] = None,
):
raise NotImplementedError

def delete(self) -> None: # type: ignore[override]
"""Delete the storage and all the data."""
filepath = Path(self.profile.storage_config['filepath'])
if filepath.exists():
rmtree(filepath)
LOGGER.report(f'Deleted storage directory at `{filepath}`.')
if self.filepath_root.exists():
rmtree(self.filepath_root)
LOGGER.report(f'Deleted storage directory at `{self.filepath_root}`.')

def get_container(self) -> 'Container':
return Container(str(self.filepath_container))

def get_repository(self) -> 'DiskObjectStoreRepositoryBackend':
from aiida.repository.backend import DiskObjectStoreRepositoryBackend

container = Container(str(Path(self.profile.storage_config['filepath']) / 'container'))
return DiskObjectStoreRepositoryBackend(container=container)
return DiskObjectStoreRepositoryBackend(container=self.get_container())

@classmethod
def version_head(cls) -> str:
Expand Down Expand Up @@ -225,3 +234,36 @@ def _get_mapper_from_entity(entity_type: 'EntityTypes', with_pk: bool):
mapper = inspect(model).mapper # type: ignore[union-attr]
keys = {key for key, col in mapper.c.items() if with_pk or col not in mapper.primary_key}
return mapper, keys

def _backup(
self,
dest: str,
keep: Optional[int] = None,
):
try:
backup_manager = backup_utils.BackupManager(dest, keep=keep)
backup_manager.backup_auto_folders(lambda path, prev: self._backup_storage(backup_manager, path, prev))
except backup_utils.BackupError as exc:
raise exceptions.StorageBackupError(*exc.args) from exc

def _backup_storage(
self,
manager: backup_utils.BackupManager,
path: Path,
prev_backup: Path | None = None,
) -> None:
"""Create a backup of the sqlite database and disk-objectstore to the provided path.
:param manager: BackupManager from backup_utils containing utilities such as for calling the rsync.
:param path: Path to where the backup will be created.
:param prev_backup: Path to the previous backup. Rsync calls will be hard-linked to this path, making the backup
incremental and efficient.
"""
LOGGER.report('Running storage maintenance')
self.maintain(full=False, compress=False)

LOGGER.report('Backing up disk-objectstore container')
manager.call_rsync(self.filepath_container, path, link_dest=prev_backup, dest_trailing_slash=True)

LOGGER.report('Backing up sqlite database')
manager.call_rsync(self.filepath_database, path, link_dest=prev_backup, dest_trailing_slash=True)

0 comments on commit de0ed6b

Please sign in to comment.