| 
 | 1 | +import pytest  | 
 | 2 | +from datetime import timedelta  | 
 | 3 | +from sedate import utcnow  | 
 | 4 | +from sqlalchemy import select, Column, Integer  | 
 | 5 | + | 
 | 6 | +from privatim.cli.apply_data_retention_policy import delete_old_records  | 
 | 7 | +from privatim.models import Consultation, SearchableFile  | 
 | 8 | +from privatim.orm import Base  | 
 | 9 | + | 
 | 10 | + | 
 | 11 | +def create_consultation(  | 
 | 12 | +    session, user, title, days_old, is_deleted=False, with_file=False  | 
 | 13 | +):  | 
 | 14 | +    """Helper function to create a consultation"""  | 
 | 15 | + | 
 | 16 | +    consultation = Consultation(  | 
 | 17 | +        title=title,  | 
 | 18 | +        creator=user,  | 
 | 19 | +        description='Test description',  | 
 | 20 | +        status='Created',  | 
 | 21 | +    )  | 
 | 22 | + | 
 | 23 | +    if with_file:  | 
 | 24 | +        file = SearchableFile(  | 
 | 25 | +            filename='test.txt',  | 
 | 26 | +            content=b'Test content',  | 
 | 27 | +        )  | 
 | 28 | +        session.add(file)  | 
 | 29 | +        consultation.files.append(file)  | 
 | 30 | + | 
 | 31 | +    consultation.updated = utcnow() - timedelta(days=days_old)  | 
 | 32 | +    consultation.deleted = is_deleted  | 
 | 33 | +    session.add(consultation)  | 
 | 34 | +    session.flush()  | 
 | 35 | +    return consultation  | 
 | 36 | + | 
 | 37 | + | 
 | 38 | +def test_delete_old_records_consultations(session, user):  | 
 | 39 | +    # Create various consultations  | 
 | 40 | +    create_consultation(session, user, 'Recent Active', 10)  | 
 | 41 | +    create_consultation(session, user, 'Old Active', 40)  | 
 | 42 | +    old_deleted = create_consultation(  | 
 | 43 | +        session, user, 'Old Deleted', 40, is_deleted=True  | 
 | 44 | +    )  | 
 | 45 | +    old_deleted_with_file = create_consultation(  | 
 | 46 | +        session,  | 
 | 47 | +        user,  | 
 | 48 | +        'Old Deleted with File',  | 
 | 49 | +        40,  | 
 | 50 | +        is_deleted=True,  | 
 | 51 | +        with_file=True,  | 
 | 52 | +    )  | 
 | 53 | +    create_consultation(session, user, 'Recent Deleted', 10, is_deleted=True)  | 
 | 54 | + | 
 | 55 | +    session.flush()  | 
 | 56 | + | 
 | 57 | +    # Run the delete_old_records function  | 
 | 58 | +    deleted_ids = delete_old_records(session, Consultation)  | 
 | 59 | + | 
 | 60 | +    # Check that the correct consultations were deleted  | 
 | 61 | +    assert len(deleted_ids) == 2  | 
 | 62 | +    assert old_deleted.id in deleted_ids  | 
 | 63 | +    assert old_deleted_with_file.id in deleted_ids  | 
 | 64 | + | 
 | 65 | +    with session.no_soft_delete_filter():  | 
 | 66 | +        remaining_consultations = session.scalars(select(Consultation)).all()  | 
 | 67 | +        remaining_ids = [c.id for c in remaining_consultations]  | 
 | 68 | + | 
 | 69 | +        assert len(remaining_consultations) == 3  | 
 | 70 | +        assert old_deleted.id not in remaining_ids  | 
 | 71 | +        assert old_deleted_with_file.id not in remaining_ids  | 
 | 72 | + | 
 | 73 | +        # Check that associated files were also deleted  | 
 | 74 | +        remaining_files = session.scalars(select(SearchableFile)).all()  | 
 | 75 | +        assert len(remaining_files) == 0  | 
 | 76 | + | 
 | 77 | + | 
 | 78 | +def test_delete_old_records_no_deletions(session, user):  | 
 | 79 | +    # Create only recent or active consultations  | 
 | 80 | +    create_consultation(session, user, 'Recent Active', 10)  | 
 | 81 | +    create_consultation(session, user, 'Recent Deleted', 10, is_deleted=True)  | 
 | 82 | + | 
 | 83 | +    session.flush()  | 
 | 84 | + | 
 | 85 | +    # Run the delete_old_records function  | 
 | 86 | +    deleted_ids = delete_old_records(session, Consultation)  | 
 | 87 | + | 
 | 88 | +    # Check that no consultations were deleted  | 
 | 89 | +    assert len(deleted_ids) == 0  | 
 | 90 | + | 
 | 91 | +    with session.no_soft_delete_filter():  | 
 | 92 | +        remaining_consultations = session.scalars(select(Consultation)).all()  | 
 | 93 | +        assert len(remaining_consultations) == 2  | 
 | 94 | + | 
 | 95 | + | 
 | 96 | +def test_delete_old_records_invalid_model(session):  | 
 | 97 | +    class InvalidModel(Base):  | 
 | 98 | +        __tablename__ = 'invalid_model'  | 
 | 99 | +        id = Column(Integer, primary_key=True)  | 
 | 100 | + | 
 | 101 | +    with pytest.raises(ValueError, match="does not support soft delete"):  | 
 | 102 | +        delete_old_records(session, InvalidModel)  | 
0 commit comments