Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate DICOM archive files to new database abstraction #1217

Merged
merged 3 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/lib/database_lib/tarchive.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""This class performs DICOM archive related database queries and common checks"""

from typing_extensions import deprecated

__license__ = "GPLv3"


@deprecated('Use `lib.db.models.dicom_archive.DbDicomArchive` instead')
class Tarchive:
"""
This class performs database queries for DICOM archives.
Expand Down Expand Up @@ -35,6 +37,7 @@ def __init__(self, db, verbose):
self.db = db
self.verbose = verbose

@deprecated('Use `lib.db.queries.dicom_archive.try_get_dicom_archive_with_*` instead')
def create_tarchive_dict(self, archive_location=None, tarchive_id=None):
"""
Create dictionary with DICOM archive information selected from the tarchive table.
Expand Down Expand Up @@ -62,6 +65,7 @@ def create_tarchive_dict(self, archive_location=None, tarchive_id=None):

return results[0] if results else None

@deprecated('Use `lib.db.models.dicom_archive.DbDicomArchive` instead')
def update_tarchive(self, tarchive_id, fields, values):
"""
Updates the tarchive table for a given TarchiveID.
Expand Down
3 changes: 3 additions & 0 deletions python/lib/database_lib/tarchive_series.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""This class performs tarchive_series related database queries and common checks"""

from typing_extensions import deprecated

__license__ = "GPLv3"


@deprecated('Use `lib.db.models.dicom_archive_series.DbDicomArchiveSeries` instead')
class TarchiveSeries:
"""
This class performs database queries for tarchive_series table.
Expand Down Expand Up @@ -35,6 +37,7 @@ def __init__(self, db, verbose):
self.db = db
self.verbose = verbose

@deprecated('Use `lib.db.queries.dicom_archive.try_get_dicom_archive_series_with_series_uid_echo_time` instead')
def get_tarchive_series_from_series_uid_and_echo_time(self, series_uid, echo_time):
"""
Create dictionary with DICOM archive information selected from the tarchive table.
Expand Down
46 changes: 42 additions & 4 deletions python/lib/db/queries/dicom_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,34 @@
from lib.db.models.dicom_archive_series import DbDicomArchiveSeries


def try_get_dicom_archive_with_study_uid(db: Database, study_uid: str):
def try_get_dicom_archive_with_id(db: Database, dicom_archive_id: int) -> Optional[DbDicomArchive]:
"""
Get a DICOM archive from the database using its ID, or return `None` if no DICOM archive is
found.
"""

return db.execute(select(DbDicomArchive)
.where(DbDicomArchive.id == dicom_archive_id)
).scalar_one_or_none()


def try_get_dicom_archive_with_archive_location(db: Database, archive_location: str) -> Optional[DbDicomArchive]:
"""
Get a DICOM archive from the database using its study UID, or return `None` if no DICOM
Get a DICOM archive from the database using its archive location, or return `None` if no DICOM
archive is found.
"""

return db.execute(select(DbDicomArchive)
.where(DbDicomArchive.archive_location.like(f'%{archive_location}%'))
).scalar_one_or_none()


def try_get_dicom_archive_with_study_uid(db: Database, study_uid: str):
"""
Get a DICOM archive from the database using its study UID, or return `None` if no DICOM archive
is found.
"""

query = select(DbDicomArchive).where(DbDicomArchive.study_uid == study_uid)
return db.execute(query).scalar_one_or_none()

Expand All @@ -39,8 +61,8 @@ def get_dicom_archive_series_with_file_info(
sequence_name: Optional[str],
):
"""
Get a DICOM archive series from the database using its file information, or raise an
exception if no DICOM archive series is found.
Get a DICOM archive series from the database using its file information, or raise an exception
if no DICOM archive series is found.
"""

query = select(DbDicomArchiveSeries) \
Expand All @@ -50,3 +72,19 @@ def get_dicom_archive_series_with_file_info(
.where(DbDicomArchiveSeries.sequence_name == sequence_name)

return db.execute(query).scalar_one()


def try_get_dicom_archive_series_with_series_uid_echo_time(
db: Database,
series_uid: str,
echo_time: float,
) -> Optional[DbDicomArchiveSeries]:
"""
Get a DICOM archive series from the database using its series UID and echo time, or return
`None` if no DICOM archive series is found.
"""

return db.execute(select(DbDicomArchiveSeries)
.where(DbDicomArchiveSeries.series_uid == series_uid)
.where(DbDicomArchiveSeries.echo_time == echo_time)
).scalar_one_or_none()
35 changes: 18 additions & 17 deletions python/lib/dcm2bids_imaging_pipeline_lib/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import lib.utilities
from lib.database import Database
from lib.database_lib.config import Config
from lib.db.queries.dicom_archive import try_get_dicom_archive_with_archive_location, try_get_dicom_archive_with_id
from lib.db.queries.session import try_get_session_with_cand_id_visit_label
from lib.db.queries.site import get_all_sites
from lib.dicom_archive import DicomArchive
from lib.exception.determine_subject_info_error import DetermineSubjectInfoError
from lib.exception.validate_subject_info_error import ValidateSubjectInfoError
from lib.imaging import Imaging
Expand Down Expand Up @@ -63,7 +63,6 @@ def __init__(self, loris_getopt_obj, script_name):
# Load the Config, Imaging, ImagingUpload, Tarchive, Session database classes
# -----------------------------------------------------------------------------------
self.config_db_obj = Config(self.db, self.verbose)
self.dicom_archive_obj = DicomArchive(self.db, self.verbose)
self.imaging_obj = Imaging(self.db, self.verbose, self.config_file)
self.imaging_upload_obj = ImagingUpload(self.db, self.verbose)

Expand Down Expand Up @@ -102,9 +101,9 @@ def __init__(self, loris_getopt_obj, script_name):
# Verify PSC information stored in DICOMs
# Grep scanner information based on what is in the DICOM headers
# ---------------------------------------------------------------------------------
if self.dicom_archive_obj.tarchive_info_dict.keys():
if self.dicom_archive is not None:
try:
self.subject_info = self.imaging_obj.determine_subject_info(self.dicom_archive_obj.tarchive_info_dict)
self.subject_info = self.imaging_obj.determine_subject_info(self.dicom_archive)
except DetermineSubjectInfoError as error:
log_error_exit(self.env, error.message, lib.exitcode.PROJECT_CUSTOMIZATION_FAILURE)

Expand Down Expand Up @@ -144,9 +143,9 @@ def load_imaging_upload_and_tarchive_dictionaries(self):
f"UploadID {upload_id} is not linked to any tarchive in mri_upload.",
lib.exitcode.SELECT_FAILURE,
)
self.dicom_archive_obj.populate_tarchive_info_dict_from_tarchive_id(tarchive_id=tarchive_id)
db_archive_location = self.dicom_archive_obj.tarchive_info_dict['ArchiveLocation']
if os.path.join(self.data_dir, 'tarchive', db_archive_location) != tarchive_path:

self.dicom_archive = try_get_dicom_archive_with_id(self.env.db, tarchive_id)
if os.path.join(self.data_dir, 'tarchive', self.dicom_archive.archive_location) != tarchive_path:
log_error_exit(
self.env,
f"UploadID {upload_id} and ArchiveLocation {tarchive_path} do not refer to the same upload",
Expand All @@ -161,18 +160,20 @@ def load_imaging_upload_and_tarchive_dictionaries(self):
else:
if self.imaging_upload_obj.imaging_upload_dict["TarchiveID"]:
tarchive_id = self.imaging_upload_obj.imaging_upload_dict["TarchiveID"]
self.dicom_archive_obj.populate_tarchive_info_dict_from_tarchive_id(tarchive_id=tarchive_id)
if self.dicom_archive_obj.tarchive_info_dict:
self.dicom_archive = try_get_dicom_archive_with_id(self.env.db, tarchive_id)
if self.dicom_archive is not None:
success = True
else:
err_msg += f"Could not load tarchive dictionary for TarchiveID {tarchive_id}"

elif tarchive_path:
archive_location = tarchive_path.replace(self.dicom_lib_dir, "")
self.dicom_archive_obj.populate_tarchive_info_dict_from_archive_location(archive_location=archive_location)
if self.dicom_archive_obj.tarchive_info_dict:
tarchive_id = self.dicom_archive_obj.tarchive_info_dict["TarchiveID"]
success, new_err_msg = self.imaging_upload_obj.create_imaging_upload_dict_from_tarchive_id(tarchive_id)
self.dicom_archive = try_get_dicom_archive_with_archive_location(self.env.db, archive_location)
if self.dicom_archive is not None:
success, new_err_msg = self.imaging_upload_obj.create_imaging_upload_dict_from_tarchive_id(
self.dicom_archive.id
)

if not success:
err_msg += new_err_msg
else:
Expand Down Expand Up @@ -222,10 +223,10 @@ def determine_scanner_info(self):
Determine the scanner information found in the database for the uploaded DICOM archive.
"""
scanner_id = self.imaging_obj.get_scanner_id(
self.dicom_archive_obj.tarchive_info_dict['ScannerManufacturer'],
self.dicom_archive_obj.tarchive_info_dict['ScannerSoftwareVersion'],
self.dicom_archive_obj.tarchive_info_dict['ScannerSerialNumber'],
self.dicom_archive_obj.tarchive_info_dict['ScannerModel'],
self.dicom_archive.scanner_manufacturer,
self.dicom_archive.scanner_software_version,
self.dicom_archive.scanner_serial_number,
self.dicom_archive.scanner_model,
self.site_dict['CenterID'],
self.session.project_id if self.session is not None else None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ def __init__(self, loris_getopt_obj, script_name):
super().__init__(loris_getopt_obj, script_name)
self.series_uid = self.options_dict["series_uid"]["value"]
self.tarchive_path = os.path.join(
self.data_dir, "tarchive", self.dicom_archive_obj.tarchive_info_dict["ArchiveLocation"]
self.data_dir, "tarchive", self.dicom_archive.archive_location
)
self.tarchive_id = self.dicom_archive_obj.tarchive_info_dict["TarchiveID"]

# ---------------------------------------------------------------------------------------------
# Run the DICOM archive validation script to check if the DICOM archive is valid
Expand All @@ -50,7 +49,7 @@ def __init__(self, loris_getopt_obj, script_name):
# Extract DICOM files from the tarchive
# ---------------------------------------------------------------------------------------------
self.extracted_dicom_dir = self.imaging_obj.extract_files_from_dicom_archive(
os.path.join(self.data_dir, 'tarchive', self.dicom_archive_obj.tarchive_info_dict["ArchiveLocation"]),
os.path.join(self.data_dir, 'tarchive', self.dicom_archive.archive_location),
self.tmp_dir
)

Expand Down Expand Up @@ -331,12 +330,9 @@ def _move_and_update_dicom_archive(self):
the `tarchive` table with the new `ArchiveLocation` and `SessionID`.
"""

tarchive_id = self.tarchive_id
acq_date = self.dicom_archive_obj.tarchive_info_dict["DateAcquired"]
archive_location = self.dicom_archive_obj.tarchive_info_dict["ArchiveLocation"]
acq_date = self.dicom_archive.date_acquired
archive_location = self.dicom_archive.archive_location

fields_to_update = ("SessionID",)
values_for_update = (self.session.id,)
pattern = re.compile("^[0-9]{4}/")
if acq_date and not pattern.match(archive_location):
# move the DICOM archive into a year subfolder
Expand All @@ -350,10 +346,12 @@ def _move_and_update_dicom_archive(self):
os.replace(self.tarchive_path, new_tarchive_path)
self.tarchive_path = new_tarchive_path
# add the new archive location to the list of fields to update in the tarchive table
fields_to_update += ("ArchiveLocation",)
values_for_update += (new_archive_location,)
self.dicom_archive.archive_location = new_archive_location

self.dicom_archive_obj.tarchive_db_obj.update_tarchive(tarchive_id, fields_to_update, values_for_update)
self.dicom_archive.session = self.session

# Update the DICOM archive in the database
self.env.db.commit()

def _compute_snr(self):
# TODO: to be implemented later on. No clear paths as to how to compute that
Expand All @@ -364,7 +362,7 @@ def _add_intended_for_to_fieldmap_json_files(self):
Add IntendedFor field in JSON file of fieldmap acquisitions according to BIDS standard for fieldmaps.
"""

fmap_files_dict = self.imaging_obj.determine_intended_for_field_for_fmap_json_files(self.tarchive_id)
fmap_files_dict = self.imaging_obj.determine_intended_for_field_for_fmap_json_files(self.dicom_archive.id)
if not fmap_files_dict:
return

Expand All @@ -382,13 +380,12 @@ def _order_modalities_per_acquisition_type(self):
Determine the file order based on the modality and populated the `files` table field `AcqOrderPerModality`.
"""

tarchive_id = self.tarchive_id
scan_type_id_list = self.imaging_obj.files_db_obj.select_distinct_acquisition_protocol_id_per_tarchive_source(
tarchive_id
self.dicom_archive.id
)
for scan_type_id in scan_type_id_list:
results = self.imaging_obj.files_db_obj.get_file_ids_and_series_number_per_scan_type_and_tarchive_id(
tarchive_id, scan_type_id
self.dicom_archive.id, scan_type_id
)
file_id_series_nb_ordered_list = sorted(results, key=lambda x: x["SeriesNumber"])
acq_number = 0
Expand All @@ -408,7 +405,7 @@ def _update_mri_upload(self):
- `SessionID` => `SessionID` associated to the upload
"""

files_inserted_list = self.imaging_obj.files_db_obj.get_files_inserted_for_tarchive_id(self.tarchive_id)
files_inserted_list = self.imaging_obj.files_db_obj.get_files_inserted_for_tarchive_id(self.dicom_archive.id)
self.imaging_upload_obj.update_mri_upload(
upload_id=self.upload_id,
fields=("Inserting", "InsertionComplete", "number_of_mincInserted", "number_of_mincCreated", "SessionID"),
Expand All @@ -426,14 +423,14 @@ def _get_summary_of_insertion(self):
- path to the log file
"""

files_results = self.imaging_obj.files_db_obj.get_files_inserted_for_tarchive_id(self.tarchive_id)
files_results = self.imaging_obj.files_db_obj.get_files_inserted_for_tarchive_id(self.dicom_archive.id)
files_inserted_list = [v["File"] for v in files_results] if files_results else None
prot_viol_results = self.imaging_obj.mri_prot_viol_scan_db_obj.get_protocol_violations_for_tarchive_id(
self.tarchive_id
self.dicom_archive.id
)
protocol_violations_list = [v["minc_location"] for v in prot_viol_results] if prot_viol_results else None
excl_viol_results = self.imaging_obj.mri_viol_log_db_obj.get_violations_for_tarchive_id(
self.tarchive_id, "exclude"
self.dicom_archive.id, "exclude"
)
excluded_violations_list = [v["MincFile"] for v in excl_viol_results] if excl_viol_results else None

Expand All @@ -447,7 +444,7 @@ def _get_summary_of_insertion(self):

summary = f"""
Finished processing UploadID {self.upload_id}!
- DICOM archive info: {self.tarchive_id} => {self.tarchive_path}
- DICOM archive info: {self.dicom_archive.id} => {self.tarchive_path}
- {nb_files_inserted} files were inserted into the files table: {files_list}
- {nb_prot_violation} files did not match any protocol: {prot_viol_list}
- {nb_excluded_viol} files were exclusionary violations: {excl_viol_list}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import sys

import lib.exitcode
import lib.utilities as utilities
from lib.db.models.dicom_archive import DbDicomArchive
from lib.dcm2bids_imaging_pipeline_lib.base_pipeline import BasePipeline
from lib.env import Env
from lib.logging import log_error_exit, log_verbose

__license__ = "GPLv3"
Expand Down Expand Up @@ -51,16 +54,40 @@ def _validate_dicom_archive_md5sum(self):

log_verbose(self.env, "Verifying DICOM archive md5sum (checksum)")

tarchive_path = os.path.join(self.dicom_lib_dir, self.dicom_archive_obj.tarchive_info_dict["ArchiveLocation"])
result = self.dicom_archive_obj.validate_dicom_archive_md5sum(tarchive_path)
message = result["message"]

if result['success']:
log_verbose(self.env, message)
else:
dicom_archive_path = os.path.join(self.dicom_lib_dir, self.dicom_archive.archive_location)
result = _validate_dicom_archive_md5sum(self.env, self.dicom_archive, dicom_archive_path)
if not result:
self.imaging_upload_obj.update_mri_upload(
upload_id=self.upload_id,
fields=("isTarchiveValidated", "IsCandidateInfoValidated"),
values=("0", "0")
)
log_error_exit(self.env, message, lib.exitcode.CORRUPTED_FILE)

log_error_exit(
self.env,
"ERROR: DICOM archive seems corrupted or modified. Upload will exit now.",
lib.exitcode.CORRUPTED_FILE,
)


def _validate_dicom_archive_md5sum(env: Env, dicom_archive: DbDicomArchive, dicom_archive_path: str) -> bool:
"""
This function validates that the md5sum of the DICOM archive on the filesystem is the same
as the md5sum of the registered entry in the tarchive table.

Retrun `true` if the MD5 sums match, or `false` if they don't.
"""

# compute the md5sum of the tarchive file
dicom_archive_file_md5_sum = utilities.compute_md5_hash(dicom_archive_path)

# grep the md5sum stored in the database
dicom_archive_db_md5_sum = dicom_archive.md5_sum_archive.split()[0]

log_verbose(
env,
f"checksum for target: {dicom_archive_file_md5_sum}; checksum from database: {dicom_archive_db_md5_sum}",
)

# check that the two md5sum are the same
return dicom_archive_file_md5_sum == dicom_archive_db_md5_sum
Loading
Loading