Skip to content

Commit

Permalink
migrate scanner to new abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
maximemulder committed Feb 19, 2025
1 parent eca3d51 commit ab1289f
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 13 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ include = [
"python/lib/get_subject_session.py",
"python/lib/logging.py",
"python/lib/make_env.py",
"python/lib/scanner.py",
"python/lib/validate_subject_info.py",
]
typeCheckingMode = "strict"
Expand Down
6 changes: 6 additions & 0 deletions python/lib/database_lib/mri_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import datetime

from typing_extensions import deprecated

from lib.candidate import Candidate

__license__ = "GPLv3"


@deprecated('Use `lib.scanner` instead')
class MriScanner:
"""
This class performs database queries for imaging dataset stored in the mri_scanner table.
Expand Down Expand Up @@ -38,6 +41,7 @@ def __init__(self, db, verbose):
self.db = db
self.verbose = verbose

@deprecated('Use `lib.scanner.get_or_create_scanner` instead')
def determine_scanner_information(self, manufacturer, software_version, serial_number, scanner_model,
center_id, project_id):
"""
Expand Down Expand Up @@ -81,6 +85,7 @@ def determine_scanner_information(self, manufacturer, software_version, serial_n
)
return scanner_id

@deprecated('Use `lib.scanner.get_or_create_scanner` instead')
def register_new_scanner(self, manufacturer, software_version, serial_number, scanner_model, center_id, project_id):
"""
Inserts a new entry in the mri_scanner table after having created a new candidate to
Expand Down Expand Up @@ -132,6 +137,7 @@ def register_new_scanner(self, manufacturer, software_version, serial_number, sc

return scanner_id

@deprecated('Use `lib.db.models.mri_scanner.DbMriScanner.candidate` instead')
def get_scanner_candid(self, scanner_id):
"""
Select a ScannerID CandID based on the scanner ID in mri_scanner.
Expand Down
26 changes: 26 additions & 0 deletions python/lib/db/queries/mri_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Optional

from sqlalchemy import select
from sqlalchemy.orm import Session as Database

from lib.db.models.mri_scanner import DbMriScanner


def try_get_scanner_with_info(
db: Database,
manufacturer: str,
software_version: str,
serial_number: str,
model: str,
) -> Optional[DbMriScanner]:
"""
Get an MRI scanner from the database using the provided information, or return `None` if no
scanner is found.
"""

return db.execute(select(DbMriScanner)
.where(DbMriScanner.manufacturer == manufacturer)
.where(DbMriScanner.model == model)
.where(DbMriScanner.serial_number == serial_number)
.where(DbMriScanner.software_version == software_version)
).scalar_one_or_none()
15 changes: 9 additions & 6 deletions python/lib/dcm2bids_imaging_pipeline_lib/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from lib.imaging import Imaging
from lib.logging import log_error_exit, log_verbose, log_warning
from lib.make_env import make_env
from lib.scanner import get_or_create_scanner
from lib.validate_subject_info import validate_subject_info


Expand Down Expand Up @@ -110,7 +111,7 @@ def __init__(self, loris_getopt_obj, script_name):
))

# grep scanner information based on what is in the DICOM headers
self.scanner_id = self.determine_scanner_info()
self.mri_scanner = self.determine_scanner_info()

def load_mri_upload_and_dicom_archive(self):
"""
Expand Down Expand Up @@ -240,17 +241,19 @@ def determine_scanner_info(self):
"""
Determine the scanner information found in the database for the uploaded DICOM archive.
"""
scanner_id = self.imaging_obj.get_scanner_id(

mri_scanner = get_or_create_scanner(
self.env,
self.dicom_archive.scanner_manufacturer,
self.dicom_archive.scanner_software_version,
self.dicom_archive.scanner_serial_number,
self.dicom_archive.scanner_model,
self.dicom_archive.scanner_serial_number,
self.dicom_archive.scanner_software_version,
self.site_dict['CenterID'],
self.session.project_id if self.session is not None else None,
)

log_verbose(self.env, f"Found Scanner ID: {scanner_id}")
return scanner_id
log_verbose(self.env, f"Found scanner ID: {mri_scanner.id}")
return mri_scanner

def validate_subject_info(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from lib.exception.validate_subject_info_error import ValidateSubjectInfoError
from lib.get_subject_session import get_subject_session
from lib.logging import log_error_exit, log_verbose
from lib.scanner import get_or_create_scanner
from lib.validate_subject_info import validate_subject_info

__license__ = "GPLv3"
Expand Down Expand Up @@ -84,7 +85,7 @@ def __init__(self, loris_getopt_obj, script_name):
if self.dicom_archive is not None:
self._validate_nifti_patient_name_with_dicom_patient_name()
self.subject_info = self.imaging_obj.determine_subject_info(
self.dicom_archive, self.scanner_id
self.dicom_archive, self.mri_scanner.id
)
else:
self._determine_subject_info_based_on_json_patient_name()
Expand Down Expand Up @@ -357,12 +358,13 @@ def _determine_acquisition_protocol(self):
scan_param = self.json_file_dict

# get scanner ID if not already figured out
if not self.scanner_id:
self.scanner_id = self.imaging_obj.get_scanner_id(
if self.mri_scanner is None:
self.mri_scanner = get_or_create_scanner(
self.env,
self.json_file_dict['Manufacturer'],
self.json_file_dict['SoftwareVersions'],
self.json_file_dict['DeviceSerialNumber'],
self.json_file_dict['ManufacturersModelName'],
self.json_file_dict['DeviceSerialNumber'],
self.json_file_dict['SoftwareVersions'],
self.site_dict['CenterID'],
self.session.project_id,
)
Expand All @@ -373,7 +375,7 @@ def _determine_acquisition_protocol(self):
self.session.cohort_id,
self.session.site_id,
self.session.visit_label,
self.scanner_id
self.mri_scanner.id,
)

protocol_info = self.imaging_obj.get_acquisition_protocol_info(
Expand Down Expand Up @@ -701,7 +703,7 @@ def _register_into_files_and_parameter_file(self, nifti_rel_path):
'InsertTime': datetime.datetime.now().timestamp(),
'Caveat': 1 if self.warning_violations_list else 0,
'TarchiveSource': self.dicom_archive.id,
'ScannerID': self.scanner_id,
'ScannerID': self.mri_scanner.id,
'AcquisitionDate': acquisition_date,
'SourceFileID': None
}
Expand Down
3 changes: 3 additions & 0 deletions python/lib/imaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import nibabel as nib
from nilearn import image, plotting
from typing_extensions import deprecated

import lib.utilities as utilities
from lib.config_file import SubjectInfo
Expand Down Expand Up @@ -832,6 +833,7 @@ def get_violations(self, checks_list, header, severity, scan_param_dict):
'MriProtocolChecksGroupID': hdr_checks_list[0]['MriProtocolChecksGroupID']
}

@deprecated('Use `lib.scanner.get_or_create_scanner` instead')
def get_scanner_id(self, manufacturer, software_version, serial_nb, model_name, center_id, project_id):
"""
Get the scanner ID based on the scanner information provided as input.
Expand All @@ -858,6 +860,7 @@ def get_scanner_id(self, manufacturer, software_version, serial_nb, model_name,
project_id
)

@deprecated('Use `lib.db.models.DbScanner.candidate` instead')
def get_scanner_candid(self, scanner_id):
"""
Select a ScannerID CandID based on the scanner ID in mri_scanner.
Expand Down
74 changes: 74 additions & 0 deletions python/lib/scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import random
from datetime import datetime

from sqlalchemy.orm import Session as Database

from lib.db.models.candidate import DbCandidate
from lib.db.models.mri_scanner import DbMriScanner
from lib.db.queries.candidate import try_get_candidate_with_cand_id
from lib.db.queries.mri_scanner import try_get_scanner_with_info
from lib.env import Env


def get_or_create_scanner(
env: Env,
manufacturer: str,
model: str,
serial_number: str,
software_version: str,
site_id: int,
project_id: int,
) -> DbMriScanner:
"""
Get an MRI scanner from the database using the provided information, or create it if it does
not already exist.
"""

mri_scanner = try_get_scanner_with_info(env.db, manufacturer, model, serial_number, software_version)

if mri_scanner is not None:
return mri_scanner

cand_id = generate_new_cand_id(env.db)
now = datetime.now()

candidate = DbCandidate(
cand_id = cand_id,
psc_id = 'scanner',
registration_site_id = site_id,
registration_project_id = project_id,
user_id = 'imaging.py',
entity_type = 'Scanner',
date_active = now,
date_registered = now,
active = True,
)

env.db.add(candidate)
env.db.commit()

mri_scanner = DbMriScanner(
manufacturer = manufacturer,
model = model,
serial_number = serial_number,
software_version = software_version,
candidate_id = candidate.id,
)

env.db.add(mri_scanner)
env.db.commit()

return mri_scanner


# TODO: Move this function to a more appropriate place.
def generate_new_cand_id(db: Database) -> int:
"""
Generate a new random CandID that is not already in the database.
"""

while True:
cand_id = random.randint(100000, 999999)
candidate = try_get_candidate_with_cand_id(db, cand_id)
if candidate is None:
return cand_id

0 comments on commit ab1289f

Please sign in to comment.