Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new schema mpi fuctions #30

Merged
merged 23 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
64996cb
converting BlockingKey to an Enum
ericbuckley Sep 18, 2024
f524140
new implementations of get_block_data and insert_matched_patient for …
ericbuckley Sep 18, 2024
2da7ecc
Merge branch 'main' into ericbuckley/10-new-schema-mpi-fuctions
ericbuckley Sep 18, 2024
2cfdc42
pinning pyway to 0.3.29
ericbuckley Sep 18, 2024
2f9727a
adding test cases for BlockingKey
ericbuckley Sep 18, 2024
1a70c84
moving external person into patient table
ericbuckley Sep 19, 2024
84722f2
ignore public docstrings in tests
ericbuckley Sep 19, 2024
9dc1a26
ignoring .DS_Store
ericbuckley Sep 19, 2024
1f86556
changing tests to pytest style
ericbuckley Sep 19, 2024
594cba2
adding TEST_DB_URI for short-term
ericbuckley Sep 19, 2024
22c92df
tests for simple mpi
ericbuckley Sep 19, 2024
de79ec4
Merge branch 'main' into ericbuckley/10-new-schema-mpi-fuctions
ericbuckley Sep 19, 2024
eb5511f
adding another test for get_block_data
ericbuckley Sep 19, 2024
abbfbf1
removing dep on python-dotenv
ericbuckley Sep 19, 2024
2f4c812
adding Patient.external_patient_id
ericbuckley Sep 19, 2024
745cd80
Merge branch 'main' into ericbuckley/10-new-schema-mpi-fuctions
ericbuckley Sep 19, 2024
406f87b
BlockingKey.to_value should return set for performance
ericbuckley Sep 19, 2024
7fcc028
fix test
ericbuckley Sep 20, 2024
cf27dfa
better support for empty PII blocking values
ericbuckley Sep 20, 2024
9a5d7ec
BlockingKey function cleanup to make it more readable
ericbuckley Sep 20, 2024
e2d4bbf
small change to PR template
ericbuckley Sep 20, 2024
9f11ead
Merge branch 'main' into ericbuckley/10-new-schema-mpi-fuctions
ericbuckley Sep 20, 2024
0b123ae
adding optimization to shortcirtuit get_block_data early if blocking …
ericbuckley Sep 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DB_URI="postgresql+psycopg2://postgres:pw@localhost:5432/postgres"
TEST_DB_URI="sqlite:///:memory:"
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ temp/
*.tmp

# macOS
.DS_Store
# ignore all files ending in .DS_Store
**/.DS_Store

# Python-Virtual Environments
.venv
Expand Down
5 changes: 2 additions & 3 deletions alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

from alembic import context

import dotenv
ENV = dotenv.dotenv_values()

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
Expand All @@ -16,7 +14,8 @@
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
config.set_main_option("sqlalchemy.url", ENV["DB_URI"])
from recordlinker.config import settings
config.set_main_option("sqlalchemy.url", settings.DB_URI)

# add your model's MetaData object here
# for 'autogenerate' support
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""external person id should be nullable

Revision ID: 64ed9566f189
Revises: bfbd015ca466
Create Date: 2024-09-18 20:22:07.510203

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = '64ed9566f189'
down_revision: Union[str, None] = 'bfbd015ca466'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('ix_mpi_blocking_value_value', table_name='mpi_blocking_value')
op.alter_column('mpi_patient', 'external_person_id',
existing_type=sa.VARCHAR(length=255),
nullable=True)
op.alter_column('mpi_patient', 'external_person_source',
existing_type=sa.VARCHAR(length=100),
nullable=True)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('mpi_patient', 'external_person_source',
existing_type=sa.VARCHAR(length=100),
nullable=False)
op.alter_column('mpi_patient', 'external_person_id',
existing_type=sa.VARCHAR(length=255),
nullable=False)
op.create_index('ix_mpi_blocking_value_value', 'mpi_blocking_value', ['value'], unique=False)
# ### end Alembic commands ###
57 changes: 57 additions & 0 deletions alembic/versions/ad18f1d41fad_convert_blockingkey_into_enum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""convert BlockingKey into enum

Revision ID: ad18f1d41fad
Revises: 6052c193a26a
Create Date: 2024-09-17 21:15:37.714595

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'ad18f1d41fad'
down_revision: Union[str, None] = '6052c193a26a'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('mpi_blocking_key')
op.drop_table('mpi_blocking_value')

op.create_table('mpi_blocking_value',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('patient_id', sa.Integer(), nullable=False),
sa.Column('blockingkey', sa.Integer(), nullable=False),
sa.Column('value', sa.String(length=50), nullable=False),
sa.ForeignKeyConstraint(['patient_id'], ['mpi_patient.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index('idx_blocking_value_patient_key_value', 'mpi_blocking_value', ['patient_id', 'blockingkey', 'value'], unique=False)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('mpi_blocking_value')

op.create_table('mpi_blocking_key',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('key', sa.String(length=50), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('mpi_blocking_value',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('patient_id', sa.Integer(), nullable=False),
sa.Column('blockingkey_id', sa.Integer(), nullable=False),
sa.Column('value', sa.String(length=50), nullable=False),
sa.ForeignKeyConstraint(['blockingkey_id'], ['mpi_blocking_key.id'], ),
sa.ForeignKeyConstraint(['patient_id'], ['mpi_patient.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_mpi_blocking_value_value'), 'mpi_blocking_value', ['value'], unique=False)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""moving ExternalPerson to Patient table

Revision ID: bfbd015ca466
Revises: ad18f1d41fad
Create Date: 2024-09-18 20:10:30.193941

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'bfbd015ca466'
down_revision: Union[str, None] = 'ad18f1d41fad'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('mpi_external_person')
op.create_index(op.f('ix_mpi_blocking_value_value'), 'mpi_blocking_value', ['value'], unique=False)
op.add_column('mpi_patient', sa.Column('external_person_id', sa.String(length=255), nullable=False))
op.add_column('mpi_patient', sa.Column('external_person_source', sa.String(length=100), nullable=False))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('mpi_patient', 'external_person_source')
op.drop_column('mpi_patient', 'external_person_id')
op.drop_index(op.f('ix_mpi_blocking_value_value'), table_name='mpi_blocking_value')
op.create_table('mpi_external_person',
sa.Column('id', sa.INTEGER(), nullable=False),
sa.Column('person_id', sa.INTEGER(), nullable=False),
sa.Column('external_id', sa.VARCHAR(length=255), nullable=False),
sa.Column('source', sa.VARCHAR(length=255), nullable=False),
sa.ForeignKeyConstraint(['person_id'], ['mpi_person.id'], ),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
Binary file removed assets/.DS_Store
Binary file not shown.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies = [
"fastapi",
"pydantic",
"pydantic-settings",
"python-dateutil==2.9.0",
"sqlalchemy",
"fhirpathpy",
"rapidfuzz",
Expand All @@ -37,7 +38,6 @@ dev = [
"pyarrow",
"httpx",
"alembic",
"python-dotenv"
]
prod = [
# List any additional production-only dependencies here
Expand Down Expand Up @@ -65,6 +65,7 @@ select = ["E4", "E7", "E9", "F", "I", "D102", "D103", "D104", "D105", "D106"] #

[tool.ruff.lint.per-file-ignores]
"**/__init__.py" = ["D"]
"tests/*.py" = ["D102", "D103"] # Ignore the public docstring rules in test files

[tool.ruff.lint.isort]
# The following settings reduce the number of changes from reorder-python-imports
Expand Down
Binary file removed src/.DS_Store
Binary file not shown.
Binary file removed src/recordlinker/.DS_Store
Binary file not shown.
3 changes: 3 additions & 0 deletions src/recordlinker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class Settings(pydantic_settings.BaseSettings):
)

db_uri: str = pydantic.Field(description="The URI for the MPI database")
# FIXME: A separate URI for testing is temporary, this is necessary right now because
# the old schema only works with postgresql. Once the old schema is removed we can consolidate
test_db_uri: str = pydantic.Field(description="The URI for the MPI database to run tests against")
connection_pool_size: typing.Optional[int] = pydantic.Field(
description="The number of MPI database connections in the connection pool",
default=5,
Expand Down
150 changes: 127 additions & 23 deletions src/recordlinker/linkage/models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import datetime
import enum
import uuid

from sqlalchemy import ForeignKey
from sqlalchemy import JSON
import dateutil.parser
from sqlalchemy import create_engine
from sqlalchemy import orm
from sqlalchemy import String
from sqlalchemy import schema
from sqlalchemy import types as sqltypes

from recordlinker.config import settings


def get_session() -> orm.Session:
"""
Creates a new session to the MPI database and returns it.
"""
engine = create_engine(settings.db_uri)
Base.metadata.create_all(engine)
return orm.Session(engine)


class Base(orm.DeclarativeBase):
Expand All @@ -15,35 +29,125 @@ class Person(Base):

id: orm.Mapped[int] = orm.mapped_column(primary_key=True)
internal_id: orm.Mapped[uuid.UUID] = orm.mapped_column(default=uuid.uuid4)


class ExternalPerson(Base):
__tablename__ = "mpi_external_person"

id: orm.Mapped[int] = orm.mapped_column(primary_key=True)
person_id: orm.Mapped[int] = orm.mapped_column(ForeignKey("mpi_person.id"))
external_id: orm.Mapped[str] = orm.mapped_column(String(255))
source: orm.Mapped[str] = orm.mapped_column(String(255))
patients: orm.Mapped[list["Patient"]] = orm.relationship(back_populates="person")


class Patient(Base):
__tablename__ = "mpi_patient"

id: orm.Mapped[int] = orm.mapped_column(primary_key=True)
person_id: orm.Mapped[int] = orm.mapped_column(ForeignKey("mpi_person.id"))
data: orm.Mapped[dict] = orm.mapped_column(JSON)

person_id: orm.Mapped[int] = orm.mapped_column(schema.ForeignKey("mpi_person.id"))
person: orm.Mapped["Person"] = orm.relationship(back_populates="patients")
data: orm.Mapped[dict] = orm.mapped_column(sqltypes.JSON)
external_patient_id: orm.Mapped[str] = orm.mapped_column(sqltypes.String(255), nullable=True)
external_person_id: orm.Mapped[str] = orm.mapped_column(sqltypes.String(255), nullable=True)
external_person_source: orm.Mapped[str] = orm.mapped_column(sqltypes.String(100), nullable=True)
blocking_values: orm.Mapped[list["BlockingValue"]] = orm.relationship(back_populates="patient")


class BlockingKey(enum.Enum):
"""
Enum for the different types of blocking keys that can be used for patient
matching. This is the universe of all possible blocking keys that a user can
choose from when configuring their algorithm. When data is loaded into the
MPI, all possible BlockingValues will be created for the defined BlockingKeys.
However, only a subset will be used in matching, based on the configuration of
the algorithm. By defining them all upfront, we give the user flexibility in
adjusting their algorithm configuration without having to reload the data.

**HERE BE DRAGONS**: IN A PRODUCTION SYSTEM, THESE ENUMS SHOULD NOT BE CHANGED!!!
"""
BIRTHDATE = 1, "Date of Birth"
MRN = 2, "Last 4 chars of MRN"
SEX = 3, "Gender"
ZIP = 4, "Zip Code"
FIRST_NAME = 5, "First 4 chars of First Name"
LAST_NAME = 6, "First 4 chars of Last Name"

def __init__(self, id: int, description: str):
self.id = id
self.description = description

def to_value(self, data: dict) -> list[str]:
"""
Given a data dictionary of Patient PII data, return a generator of all
possible values for this Key. Many Keys will only have 1 possible value,
but some (like first name) could have multiple values.
"""
if self == BlockingKey.BIRTHDATE:
return self._extract_birthdate(data)
if self == BlockingKey.MRN:
return self._extract_mrn_last_four(data)
if self == BlockingKey.SEX:
return self._extract_sex(data)
if self == BlockingKey.ZIP:
return self._extract_zipcode(data)
if self == BlockingKey.FIRST_NAME:
return self._extract_first_name_first_four(data)
if self == BlockingKey.LAST_NAME:
return self._extract_last_name_first_four(data)
return []

def _extract_birthdate(self, data: dict) -> list[str]:
if "birthdate" in data:
val = data["birthdate"]
if not isinstance(val, (datetime.date, datetime.datetime)):
# if not an instance of date or datetime, try to parse it
val = dateutil.parser.parse(str(val))
return [val.strftime("%Y-%m-%d")]
return []

def _extract_mrn_last_four(self, data: dict) -> list[str]:
if "mrn" in data:
return [data["mrn"].strip()[-4:]]
return []

def _extract_sex(self, data: dict) -> list[str]:
if "sex" in data:
val = str(data["sex"]).lower().strip()
if val in ["m", "male"]:
return ["m"]
elif val in ["f", "female"]:
return ["f"]
return ["u"]
return []

def _extract_zipcode(self, data: dict) -> list[str]:
zipcodes = []
for address in data.get("address", []):
if isinstance(address, dict):
if "zip" in address:
zipcodes.append(str(address["zip"]).strip()[0:5])
return zipcodes

def _extract_first_name_first_four(self, data: dict) -> list[str]:
names = []
for name in data.get("name", []):
if isinstance(name, dict):
for given in name.get("given", []):
names.append(str(given)[:4])
return names

def _extract_last_name_first_four(self, data: dict) -> list[str]:
names = []
for name in data.get("name", []):
if isinstance(name, dict):
if "family" in name:
names.append(str(name["family"])[:4])
return names

class BlockingKey(Base):
__tablename__ = "mpi_blocking_key"

id: orm.Mapped[int] = orm.mapped_column(primary_key=True)
key: orm.Mapped[str] = orm.mapped_column(String(50), index=True)

class BlockingValue(Base):
__tablename__ = "mpi_blocking_value"
# create a composite index on patient_id, blockingkey and value
__table_args__ = (
schema.Index(
"idx_blocking_value_patient_key_value", "patient_id", "blockingkey", "value"
),
)

id: orm.Mapped[int] = orm.mapped_column(primary_key=True)
patient_id: orm.Mapped[int] = orm.mapped_column(ForeignKey("mpi_patient.id"))
blockingkey_id: orm.Mapped[int] = orm.mapped_column(ForeignKey("mpi_blocking_key.id"))
value: orm.Mapped[str] = orm.mapped_column(String(50), index=True)
patient_id: orm.Mapped[int] = orm.mapped_column(schema.ForeignKey("mpi_patient.id"))
patient: orm.Mapped["Patient"] = orm.relationship(back_populates="blocking_values")
blockingkey: orm.Mapped[int] = orm.mapped_column(sqltypes.Integer)
value: orm.Mapped[str] = orm.mapped_column(sqltypes.String(50))
Loading
Loading