Skip to content

Commit

Permalink
Add endpoint to return orphaned persons (#225)
Browse files Browse the repository at this point in the history
## Description
The PR adds an endpoint to return a paginated list of all the persons
with no members.

While working on this PR I also made some small changes to
`get_orphaned_patients`:

- added an explicit `ORDER_BY` after discovering some inconsistent
results when I included a `limit` but not a `cursor`. I also updated the
tests accordingly.
- genericized `schemas.PaginatedRefs` so that the paginated results for
both get orphaned patients and persons are the same.

## Related Issues
#163 

## Additional Notes
I spent a good amount of time thinking about how we should execute this
query to make it as efficient as possible. My working assumptions were
that:
1) the patient table will be larger than the person table, 
2) both tables will be large, and 
3) orphaned persons will be relatively rare.

I considered two approaches, "LEFT JOIN" (which I ultimately landed on)
and "NOT EXISTS", but I am open to hearing others.

LEFT JOIN approach:
```
SELECT p.*
FROM mpi_person p
LEFT JOIN mpi_patient pt ON pt.person_id = p.id
WHERE pt.id IS NULL
``` 

From `EXPLAIN QUERY PLAN`, we can see that we are scanning `mpi_person`,
using a Bloom filter when scanning the larger patient table to more
quickly eliminate rows that definitely do not match any rows in Person,
and uses a covering index on `mpi_patient.id`, which should keep things
as quick as possible.

NOT EXISTS approach:
```
SELECT *
FROM mpi_person p
WHERE NOT EXISTS (
    SELECT 1 
    FROM mpi_patient pt 
    WHERE pt.person_id = p.id
)
```
This approach is less efficient because of the subquery executes
multiple times checking for matching rows in Patient for each row in
Person. It would be more efficient if we add an index on
Patient.person_id, but I still think LEFT JOIN is a better choice given
the assumptions stated above (especially #1).

I also briefly considered a view, but given the number of updates to the
Patient and Person tables, I don't think this is our best path forward.

All that said, I am open to other approaches and would love to get
folks' thoughts.

---------

Co-authored-by: Eric Buckley <[email protected]>
  • Loading branch information
m-goggins and ericbuckley authored Feb 28, 2025
1 parent 1027b73 commit a2798c1
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 27 deletions.
33 changes: 32 additions & 1 deletion src/recordlinker/database/mpi_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,41 @@ def get_orphaned_patients(
"""
Retrieve orphaned Patients in the MPI database, up to the provided limit.
"""
query = select(models.Patient).where(models.Patient.person_id.is_(None)).limit(limit)
query = (
select(models.Patient)
.where(models.Patient.person_id.is_(None))
.order_by(models.Patient.id)
.limit(limit)
)

# Apply cursor if provided
if cursor:
query = query.where(models.Patient.id > cursor)

return session.execute(query).scalars().all()


def get_orphaned_persons(
session: orm.Session,
limit: int | None = 50,
cursor: int | None = None,
) -> typing.Sequence[models.Person]:
"""
Retrieve orphaned Persons in the MPI database, up to the provided limit. If a
cursor (in the form of a person reference_id) is provided, only retrieve Persons
with a reference_id greater than the cursor.
"""
query = (
select(models.Person)
.outerjoin(models.Patient, models.Patient.person_id == models.Person.id)
.filter(models.Patient.id.is_(None))
.order_by(models.Person.id)
)
if cursor:
query = query.filter(models.Person.id > cursor)

query = query.limit(
limit
) # limit applied after cursor to ensure the limit is applied after the JOIN and starts from the cursor after the join

return session.execute(query).scalars().all()
10 changes: 5 additions & 5 deletions src/recordlinker/routes/patient_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def get_orphaned_patients(
session: orm.Session = fastapi.Depends(get_session),
limit: int | None = fastapi.Query(50, alias="limit", ge=1, le=1000),
cursor: uuid.UUID | None = fastapi.Query(None, alias="cursor"),
) -> schemas.PaginatedPatientRefs:
) -> schemas.PaginatedRefs:
"""
Retrieve patient_reference_id(s) for all Patients that are not linked to a Person.
"""
Expand All @@ -91,8 +91,8 @@ def get_orphaned_patients(

patients = service.get_orphaned_patients(session, limit, cur)
if not patients:
return schemas.PaginatedPatientRefs(
patients=[], meta=schemas.PaginatedMetaData(next_cursor=None, next=None)
return schemas.PaginatedRefs(
data=[], meta=schemas.PaginatedMetaData(next_cursor=None, next=None)
)
# Prepare the meta data
next_cursor = patients[-1].reference_id if len(patients) == limit else None
Expand All @@ -102,8 +102,8 @@ def get_orphaned_patients(
else None
)

return schemas.PaginatedPatientRefs(
patients=[p.reference_id for p in patients if p.reference_id],
return schemas.PaginatedRefs(
data=[p.reference_id for p in patients if p.reference_id],
meta=schemas.PaginatedMetaData(
next_cursor=next_cursor,
next=pydantic.HttpUrl(next_url) if next_url else None,
Expand Down
55 changes: 55 additions & 0 deletions src/recordlinker/routes/person_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import uuid

import fastapi
import pydantic
import sqlalchemy.orm as orm

from recordlinker import models
Expand Down Expand Up @@ -102,6 +103,60 @@ def update_person(
return schemas.PersonRef(person_reference_id=person.reference_id)


@router.get(
"/orphaned", summary="Retrieve orphaned persons", status_code=fastapi.status.HTTP_200_OK
)
def get_orphaned_persons(
request: fastapi.Request,
session: orm.Session = fastapi.Depends(get_session),
limit: int | None = fastapi.Query(50, alias="limit", ge=1, le=1000),
cursor: uuid.UUID | None = fastapi.Query(None, alias="cursor"),
) -> schemas.PaginatedRefs:
"""
Retrieve person_reference_id(s) for all Persons that are not linked to any Patients.
"""
# Check if the cursor is a valid Person reference_id
if cursor:
person = service.get_persons_by_reference_ids(session, cursor)
if not person or person[0] is None:
raise fastapi.HTTPException(
status_code=fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=[
{
"loc": ["query", "cursor"],
"msg": "Cursor is an invalid Person reference_id",
"type": "value_error",
}
],
)
# Replace the cursor with the Patient id instead of reference_id
cur = person[0].id
else:
cur = None

persons = service.get_orphaned_persons(session, limit, cur)
if not persons:
return schemas.PaginatedRefs(
data=[], meta=schemas.PaginatedMetaData(next_cursor=None, next=None)
)

# Prepare the meta data
next_cursor = persons[-1].reference_id if len(persons) == limit else None
next_url = (
f"{request.base_url}person/orphaned?limit={limit}&cursor={next_cursor}"
if next_cursor
else None
)

return schemas.PaginatedRefs(
data=[p.reference_id for p in persons if p.reference_id],
meta=schemas.PaginatedMetaData(
next_cursor=next_cursor,
next=pydantic.HttpUrl(next_url) if next_url else None,
),
)


@router.get(
"/{person_reference_id}",
summary="Retrieve a person cluster",
Expand Down
4 changes: 2 additions & 2 deletions src/recordlinker/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .mpi import ErrorDetail
from .mpi import ErrorResponse
from .mpi import PaginatedMetaData
from .mpi import PaginatedPatientRefs
from .mpi import PaginatedRefs
from .mpi import PatientCreatePayload
from .mpi import PatientInfo
from .mpi import PatientPersonRef
Expand Down Expand Up @@ -61,5 +61,5 @@
"ErrorDetail",
"ErrorResponse",
"PaginatedMetaData",
"PaginatedPatientRefs",
"PaginatedRefs",
]
4 changes: 2 additions & 2 deletions src/recordlinker/schemas/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ class PaginatedMetaData(pydantic.BaseModel):
next: pydantic.HttpUrl | None = None


class PaginatedPatientRefs(pydantic.BaseModel):
patients: list[uuid.UUID] = pydantic.Field(...)
class PaginatedRefs(pydantic.BaseModel):
data: list[uuid.UUID] = pydantic.Field(...)
meta: PaginatedMetaData | None
55 changes: 53 additions & 2 deletions tests/unit/database/test_mpi_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,6 @@ def test_get_orphaned_patients_limit(self, session: Session):
assert len(mpi_service.get_orphaned_patients(session, limit=3)) == 2

def test_get_orphaned_patients_cursor(self, session: Session):
# ordered_uuids.sort()

patient1 = models.Patient(person=None, data={"id": 1})
patient2 = models.Patient(person=None, data={"id": 2})
patient3 = models.Patient(person=None, data={"id": 3})
Expand All @@ -949,3 +947,56 @@ def test_get_orphaned_patients_cursor(self, session: Session):
patient2,
patient3,
]


class TestGetOrphanedPersons:
def test_get_orphaned_persons_success(self, session: Session):
person1 = models.Person()
person2 = models.Person()
patient1 = models.Patient(person=person1, data={})
session.add_all([patient1, person2])
session.flush()
assert session.query(models.Patient).count() == 1
assert session.query(models.Person).count() == 2
assert mpi_service.get_orphaned_persons(session) == [person2]

def test_get_orphaned_persons_no_persons(self, session: Session):
patient = models.Patient(person=models.Person(), data={})
session.add(patient)
session.flush()
assert mpi_service.get_orphaned_persons(session) == []

def test_get_orphaned_persons_limit(self, session: Session):
# Checks that limit is correctly applied
person1 = models.Person()
person2 = models.Person()
person3 = models.Person()
patient = models.Patient(person=person1, data={})
session.add_all([patient, person2, person3])
session.flush()

assert len(mpi_service.get_orphaned_persons(session, limit=1)) == 1
assert len(mpi_service.get_orphaned_persons(session, limit=2)) == 2
assert len(mpi_service.get_orphaned_persons(session, limit=3)) == 2

def test_get_orphaned_persons_cursor(self, session: Session):
# Checks that cursor is correctly applied
person1 = models.Person(id=1)
person2 = models.Person(id=2)
person3 = models.Person(id=3)
person4 = models.Person(id=4)
patient = models.Patient(person=person4, data={})
session.add_all([patient, person1, person2, person3])
session.flush()

assert mpi_service.get_orphaned_persons(session, limit=1, cursor=person1.id) == [person2]
assert mpi_service.get_orphaned_persons(session, limit=1, cursor=person2.id) == [person3]
assert mpi_service.get_orphaned_persons(session, limit=2, cursor=person2.id) == [person3]
assert mpi_service.get_orphaned_persons(session, limit=2, cursor=person1.id) == [
person2,
person3,
]
assert mpi_service.get_orphaned_persons(session, limit=5, cursor=person1.id) == [
person2,
person3,
]
52 changes: 37 additions & 15 deletions tests/unit/routes/test_patient_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,25 +182,47 @@ def test_get_orphaned_patients(self, client):
response = client.get("/patient/orphaned")
assert response.status_code == 200
assert response.json() == {
"patients": [str(patient1.reference_id)],
"data": [str(patient1.reference_id)],
"meta": {"next_cursor": None, "next": None},
}

def test_no_orphaned_patients(self, client):
response = client.get("/patient/orphaned")
assert response.status_code == 200
assert response.json() == {
"patients": [],
"data": [],
"meta": {"next_cursor": None, "next": None},
}

def test_get_orphaned_patients_with_cursor(self, client):
ordered_uuids = [uuid.uuid4() for _ in range(3)]
ordered_uuids.sort()
def test_get_orphaned_patients_with_limit(self, client):
patient1 = models.Patient(person=None, data={"id": 1})
patient2 = models.Patient(person=None, data={"id": 2})
client.session.add_all([patient1, patient2])
client.session.flush()

response = client.get("/patient/orphaned?limit=1")
assert response.status_code == 200
assert response.json() == {
"data": [str(patient1.reference_id)],
"meta": {
"next_cursor": str(patient1.reference_id),
"next": f"http://testserver/patient/orphaned?limit=1&cursor={str(patient1.reference_id)}",
},
}

patient1 = models.Patient(person=None, reference_id=ordered_uuids[0])
patient2 = models.Patient(person=None, reference_id=ordered_uuids[1])
patient3 = models.Patient(person=None, reference_id=ordered_uuids[2])
response = client.get("/patient/orphaned?limit=2")
assert response.json() == {
"data": [str(patient1.reference_id), str(patient2.reference_id)],
"meta": {
"next_cursor": str(patient2.reference_id),
"next": f"http://testserver/patient/orphaned?limit=2&cursor={str(patient2.reference_id)}",
},
}

def test_get_orphaned_patients_with_cursor(self, client):
patient1 = models.Patient(person=None, data={"id": 1})
patient2 = models.Patient(person=None, data={"id": 2})
patient3 = models.Patient(person=None, data={"id": 3})
client.session.add_all([patient1, patient2, patient3])
client.session.flush()

Expand All @@ -209,27 +231,27 @@ def test_get_orphaned_patients_with_cursor(self, client):
assert response.status_code == 200

assert response.json() == {
"patients": [str(patient2.reference_id)],
"data": [str(patient2.reference_id)],
"meta": {
"next_cursor": str(ordered_uuids[1]),
"next": f"http://testserver/patient/orphaned?limit=1&cursor={str(ordered_uuids[1])}",
"next_cursor": str(patient2.reference_id),
"next": f"http://testserver/patient/orphaned?limit=1&cursor={str(patient2.reference_id)}",
},
}

# Retrieve 2 patients after patient1, return cursor for patient3
response = client.get(f"/patient/orphaned?limit=2&cursor={patient1.reference_id}")
assert response.json() == {
"patients": [str(patient2.reference_id), str(patient3.reference_id)],
"data": [str(patient2.reference_id), str(patient3.reference_id)],
"meta": {
"next_cursor": str(ordered_uuids[2]),
"next": f"http://testserver/patient/orphaned?limit=2&cursor={ordered_uuids[2]}",
"next_cursor": str(patient3.reference_id),
"next": f"http://testserver/patient/orphaned?limit=2&cursor={str(patient3.reference_id)}",
},
}

# Retrieve the 2 orphaned patients after patient1, return no cursor
response = client.get(f"/patient/orphaned?limit=5&cursor={patient1.reference_id}")
assert response.json() == {
"patients": [
"data": [
str(patient2.reference_id),
str(patient3.reference_id),
],
Expand Down
Loading

0 comments on commit a2798c1

Please sign in to comment.