Skip to content

Commit

Permalink
migrate scanner to new abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
maximemulder committed Feb 16, 2025
1 parent eca3d51 commit 84359e3
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 9 deletions.
6 changes: 6 additions & 0 deletions python/lib/database_lib/mri_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import datetime

from typing_extensions import deprecated

from lib.candidate import Candidate

__license__ = "GPLv3"


@deprecated('Use `lib.scanner` instead')
class MriScanner:
"""
This class performs database queries for imaging dataset stored in the mri_scanner table.
Expand Down Expand Up @@ -38,6 +41,7 @@ def __init__(self, db, verbose):
self.db = db
self.verbose = verbose

@deprecated('Use `lib.scanner.get_or_create_scanner` instead')
def determine_scanner_information(self, manufacturer, software_version, serial_number, scanner_model,
center_id, project_id):
"""
Expand Down Expand Up @@ -81,6 +85,7 @@ def determine_scanner_information(self, manufacturer, software_version, serial_n
)
return scanner_id

@deprecated('Use `lib.scanner.get_or_create_scanner` instead')
def register_new_scanner(self, manufacturer, software_version, serial_number, scanner_model, center_id, project_id):
"""
Inserts a new entry in the mri_scanner table after having created a new candidate to
Expand Down Expand Up @@ -132,6 +137,7 @@ def register_new_scanner(self, manufacturer, software_version, serial_number, sc

return scanner_id

@deprecated('Use `lib.db.models.mri_scanner.DbMriScanner.candidate` instead')
def get_scanner_candid(self, scanner_id):
"""
Select a ScannerID CandID based on the scanner ID in mri_scanner.
Expand Down
26 changes: 26 additions & 0 deletions python/lib/db/queries/mri_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Optional

from sqlalchemy import select
from sqlalchemy.orm import Session as Database

from lib.db.models.mri_scanner import DbMriScanner


def try_get_scanner_with_info(
db: Database,
manufacturer: str,
software_version: str,
serial_number: str,
model: str,
) -> Optional[DbMriScanner]:
"""
Get an MRI scanner from the database using the provided information, or return `None` if no
scanner is found.
"""

return db.execute(select(DbMriScanner)
.where(DbMriScanner.manufacturer == manufacturer)
.where(DbMriScanner.model == model)
.where(DbMriScanner.serial_number == serial_number)
.where(DbMriScanner.software_version == software_version)
).scalar_one_or_none()
10 changes: 6 additions & 4 deletions python/lib/dcm2bids_imaging_pipeline_lib/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from lib.database import Database
from lib.database_lib.config import Config
from lib.db.queries.dicom_archive import try_get_dicom_archive_with_archive_location
from lib.db.queries.mri_scanner import get_or_create_scanner_with_info
from lib.db.queries.mri_upload import try_get_mri_upload_with_id
from lib.db.queries.session import try_get_session_with_cand_id_visit_label
from lib.db.queries.site import get_all_sites
Expand Down Expand Up @@ -110,7 +111,7 @@ def __init__(self, loris_getopt_obj, script_name):
))

# grep scanner information based on what is in the DICOM headers
self.scanner_id = self.determine_scanner_info()
self.mri_scanner = self.determine_scanner_info()

def load_mri_upload_and_dicom_archive(self):
"""
Expand Down Expand Up @@ -240,7 +241,8 @@ def determine_scanner_info(self):
"""
Determine the scanner information found in the database for the uploaded DICOM archive.
"""
scanner_id = self.imaging_obj.get_scanner_id(

mri_scanner = get_or_create_scanner_with_info(
self.dicom_archive.scanner_manufacturer,
self.dicom_archive.scanner_software_version,
self.dicom_archive.scanner_serial_number,
Expand All @@ -249,8 +251,8 @@ def determine_scanner_info(self):
self.session.project_id if self.session is not None else None,
)

log_verbose(self.env, f"Found Scanner ID: {scanner_id}")
return scanner_id
log_verbose(self.env, f"Found scanner ID: {mri_scanner.id}")
return mri_scanner

def validate_subject_info(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lib.exitcode
import lib.utilities as utilities
from lib.db.queries.dicom_archive import try_get_dicom_archive_series_with_series_uid_echo_time
from lib.db.queries.mri_scanner import get_or_create_scanner_with_info
from lib.dcm2bids_imaging_pipeline_lib.base_pipeline import BasePipeline
from lib.exception.determine_subject_info_error import DetermineSubjectInfoError
from lib.exception.validate_subject_info_error import ValidateSubjectInfoError
Expand Down Expand Up @@ -84,7 +85,7 @@ def __init__(self, loris_getopt_obj, script_name):
if self.dicom_archive is not None:
self._validate_nifti_patient_name_with_dicom_patient_name()
self.subject_info = self.imaging_obj.determine_subject_info(
self.dicom_archive, self.scanner_id
self.dicom_archive, self.mri_scanner.id
)
else:
self._determine_subject_info_based_on_json_patient_name()
Expand Down Expand Up @@ -357,8 +358,8 @@ def _determine_acquisition_protocol(self):
scan_param = self.json_file_dict

# get scanner ID if not already figured out
if not self.scanner_id:
self.scanner_id = self.imaging_obj.get_scanner_id(
if self.mri_scanner is None:
self.mri_scanner = get_or_create_scanner_with_info(
self.json_file_dict['Manufacturer'],
self.json_file_dict['SoftwareVersions'],
self.json_file_dict['DeviceSerialNumber'],
Expand All @@ -373,7 +374,7 @@ def _determine_acquisition_protocol(self):
self.session.cohort_id,
self.session.site_id,
self.session.visit_label,
self.scanner_id
self.mri_scanner.id,
)

protocol_info = self.imaging_obj.get_acquisition_protocol_info(
Expand Down Expand Up @@ -701,7 +702,7 @@ def _register_into_files_and_parameter_file(self, nifti_rel_path):
'InsertTime': datetime.datetime.now().timestamp(),
'Caveat': 1 if self.warning_violations_list else 0,
'TarchiveSource': self.dicom_archive.id,
'ScannerID': self.scanner_id,
'ScannerID': self.mri_scanner.id,
'AcquisitionDate': acquisition_date,
'SourceFileID': None
}
Expand Down
3 changes: 3 additions & 0 deletions python/lib/imaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import nibabel as nib
from nilearn import image, plotting
from typing_extensions import deprecated

import lib.utilities as utilities
from lib.config_file import SubjectInfo
Expand Down Expand Up @@ -832,6 +833,7 @@ def get_violations(self, checks_list, header, severity, scan_param_dict):
'MriProtocolChecksGroupID': hdr_checks_list[0]['MriProtocolChecksGroupID']
}

@deprecated('Use `lib.scanner.get_or_create_scanner` instead')
def get_scanner_id(self, manufacturer, software_version, serial_nb, model_name, center_id, project_id):
"""
Get the scanner ID based on the scanner information provided as input.
Expand All @@ -858,6 +860,7 @@ def get_scanner_id(self, manufacturer, software_version, serial_nb, model_name,
project_id
)

@deprecated('Use `lib.db.models.DbScanner.candidate` instead')
def get_scanner_candid(self, scanner_id):
"""
Select a ScannerID CandID based on the scanner ID in mri_scanner.
Expand Down
70 changes: 70 additions & 0 deletions python/lib/scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import random
from datetime import datetime

from sqlalchemy.orm import Session as Database

from lib.db.models.candidate import DbCandidate
from lib.db.models.mri_scanner import DbMriScanner
from lib.db.queries.candidate import try_get_candidate_with_cand_id
from lib.db.queries.mri_scanner import try_get_scanner_with_info
from lib.env import Env


def get_or_create_scanner(
env: Env,
manufacturer: str,
model: str,
serial_number: str,
software_version: str,
site_id: int,
project_id: int,
) -> DbMriScanner:
"""
Get an MRI scanner from the database using the provided information, or create it if it does
not already exist.
"""

mri_scanner = try_get_scanner_with_info(env.db, manufacturer, model, serial_number, software_version)

if mri_scanner is not None:
return mri_scanner

cand_id = generate_new_cand_id(env.db)
now = datetime.now()

candidate = DbCandidate(
cand_id = cand_id,
psc_id = 'scanner',
site_id = site_id,
registration_site_id = site_id,
registration_project_id = project_id,
user_id = 'imaging.py',
entity_type = 'Scanner',
date_active = now,
date_registered = now,
)

mri_scanner = DbMriScanner(
manufacturer = manufacturer,
model = model,
serial_nuber = serial_number,
software_version = software_version,
candidate_id = candidate.id,
)

env.db.commit()

return mri_scanner


# TODO: Move this function to a more appropriate place.
def generate_new_cand_id(db: Database) -> int:
"""
Generate a new random CandID that is not already in the database.
"""

while True:
cand_id = random.randint(100000, 999999)
candidate = try_get_candidate_with_cand_id(db, cand_id)
if candidate is None:
return cand_id

0 comments on commit 84359e3

Please sign in to comment.