Skip to content

Commit

Permalink
Fixes delete (SEA-1429).
Browse files Browse the repository at this point in the history
  • Loading branch information
cyrillkuettel committed Jul 31, 2024
1 parent 0184fe1 commit bdec6b4
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 30 deletions.
19 changes: 12 additions & 7 deletions src/privatim/models/consultation.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ def __init__(
id: Mapped[UUIDStrPK]

name: Mapped[str] = mapped_column(nullable=False)
consultations = relationship(
'Consultation', back_populates='status'

consultations: Mapped['Consultation'] = relationship(
'Consultation',
back_populates='status',
)
consultation_id: Mapped[UUIDStrType] = mapped_column(
ForeignKey('consultations.id'),
nullable=True
ForeignKey('consultations.id', ondelete='CASCADE'),
nullable=True,
)

def __repr__(self) -> str:
Expand All @@ -70,8 +72,9 @@ def __init__(
consultation: Mapped['Consultation'] = relationship(
'Consultation', back_populates='secondary_tags',
)

consultation_id: Mapped[UUIDStrType] = mapped_column(
ForeignKey('consultations.id'),
ForeignKey('consultations.id', ondelete='CASCADE'),
nullable=True
)

Expand Down Expand Up @@ -138,13 +141,15 @@ def __init__(

status: Mapped[Status | None] = relationship(
'Status', back_populates='consultations',
)
cascade="all, delete-orphan", )

created: Mapped[datetime] = mapped_column(default=utcnow)
updated: Mapped[datetime] = mapped_column(default=utcnow, onupdate=utcnow)

secondary_tags: Mapped[list[Tag]] = relationship(
'Tag', back_populates='consultation',
'Tag',
back_populates='consultation',
cascade='all, delete-orphan'
)

# in theory this could be nullable=False, but let's avoid problems with
Expand Down
6 changes: 1 addition & 5 deletions src/privatim/static/js/custom/custom.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ document.addEventListener('DOMContentLoaded', function () {
setupCommentAnswerField();
makeConsultationsClickable();

if (window.location.href.includes('consultations/')) {
document.querySelectorAll('.upload-widget.without-data').forEach(el => {
el.style.display = 'none';
});
}

});

function makeConsultationsClickable() {
Expand Down
71 changes: 54 additions & 17 deletions src/privatim/views/consultations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
from markupsafe import Markup
from sqlalchemy import select
from sqlalchemy.orm import joinedload

from privatim.forms.add_comment import CommentForm, NestedCommentForm
from privatim.forms.consultation_form import ConsultationForm
from privatim.models import Consultation
Expand All @@ -14,6 +16,7 @@

from typing import TYPE_CHECKING
if TYPE_CHECKING:
from privatim.orm import FilteredSession
from pyramid.interfaces import IRequest
from privatim.types import RenderDataOrRedirect, RenderData

Expand Down Expand Up @@ -230,8 +233,8 @@ def create_consultation_from_form(
comments=prev.comments,
is_latest_version=1
)
prev.replaced_by = new_consultation
prev.is_latest_version = 0
prev.replaced_by = new_consultation
session.add(prev)
new_consultation.reindex_files()
return new_consultation
Expand Down Expand Up @@ -275,27 +278,61 @@ def edit_consultation_view(
}


def delete_consultation_chain(
session: 'FilteredSession', consultation: Consultation
) -> list[str]:
"""
Go backwards through the version history of the consultations linked
list and delete all of them. We need to make sure we delete associated
Status and SearchableAssociatedFile from association table.
"""
with session.no_consultation_filter():
# Gather all IDs in the chain
ids_to_delete = []
current = consultation
ids_to_delete.append(str(current.id))
while current:
current = current.previous_version # type: ignore
if current is not None:
ids_to_delete.append(str(current.id))

# Fetch all consultations with their associated status and files
consultations = (
session.execute(
select(Consultation)
.options(
joinedload(Consultation.status),
joinedload(Consultation.files),
)
.where(Consultation.id.in_(ids_to_delete))
)
.unique()
.scalars()
.all()
)

# Delete consultations one by one
for consultation in consultations:
# The Status and files will be automatically deleted due to cascade
session.delete(consultation)

session.flush()

return ids_to_delete


def delete_consultation_view(
context: Consultation, request: 'IRequest'
) -> 'RenderDataOrRedirect':
session = request.dbsession

# from sqlalchemy_file.storage import StorageManager
# for file in context.files:
# try:
# # be extra cautious and delete the file first
# path = file.file.path
# StorageManager.delete_file(path)
# except Exception as e:
# log.error(f'StorageManager deleting file: {path}; {e}')

session.delete(context)

# with contextlib.suppress(ObjectDoesNotExistError):
# request.tm.commit()
start_id = context.id
start = session.get(Consultation, start_id)
if start is not None:
delete_consultation_chain(session, start) # type: ignore[arg-type]
message = _('Successfully deleted consultation.')
if not request.is_xhr:
request.messages.add(message, 'success')

target_url = request.route_url('activities')
message = _('Successfully deleted consultation.')
if not request.is_xhr:
request.messages.add(message, 'success')
return HTTPFound(location=target_url)
93 changes: 92 additions & 1 deletion tests/views/client/test_views_consultation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from privatim.models import User
from privatim.models import User, SearchableFile
from privatim.models.comment import Comment
from privatim.models.consultation import Status, Consultation
from sqlalchemy import select, exists, func
from webtest.forms import Upload

from privatim.views.consultations import delete_consultation_chain


def test_view_consultation(client):

Expand Down Expand Up @@ -238,3 +240,92 @@ def test_edit_consultation_without_files(client):

assert 'BE' in page
assert 'LU' in page


def get_files_by_filename(session, filenames):
with session.no_consultation_filter():
query = (
select(SearchableFile)
.filter(
SearchableFile.filename.in_(filenames),
)
)
result = session.execute(query)
files = result.scalars().all()
found_files = {file.filename: file for file in files}
# missing_files = set(filenames) - set(found_files.keys())
return found_files


def test_edit_and_delete_consultation_chain(client, pdf_vemz):
session = client.db
client.login_admin()

# Create a new consultation
page = client.get('/consultations')
page = page.click('Vernehmlassung Erfassen')
page.form['title'] = 'Initial Consultation'
page.form['description'] = 'Initial description'
page.form['recommendation'] = 'Initial recommendation'
page.form['status'] = '1'
page.form['secondary_tags'] = ['AG', 'ZH']
page.form['files'] = Upload(*pdf_vemz)
page.form.submit().follow()

initial_consultation = session.execute(
select(Consultation).filter_by(description='Initial description')
).scalar_one()
initial_id = initial_consultation.id

# Edit the consultation to create a new version
page = client.get(f'/consultations/{str(initial_id)}/edit')
page.form['title'] = 'Updated Consultation'
page.form['description'] = 'Updated description'
page.form['recommendation'] = 'Updated recommendation'
page.form['status'] = '2'
page.form['secondary_tags'] = ['BE', 'LU']
page.form['files'] = Upload('UpdatedTest.txt',
b'Updated file content.')
page.form.submit().follow()

# check files exisit query
# filename1 = 'search_test_privatim_Vernehmlassung_VEMZ.pdf'
# filename2 = 'UpdatedTest.txt'

# assert (
# 'search_test_privatim_Vernehmlassung_VEMZ.pdf' in files
# ), "Expected file not found"
# assert 'UpdatedTest.txt' in files, "Expected file not found"

updated_consultation = session.execute(
select(Consultation).filter_by(is_latest_version=1)
).scalar_one()
updated_id = updated_consultation.id

# # Verify the chain
with session.no_consultation_filter():
initial_consultation = session.execute(
select(Consultation).filter_by(is_latest_version=0)
).scalar_one()
updated_consultation = session.execute(
select(Consultation).filter_by(is_latest_version=1)
).scalar_one()

# assert initial_consultation.replaced_by == updated_consultation
assert updated_consultation.previous_version == initial_consultation

# Delete the consultation chain
deleted_ids = delete_consultation_chain(session, updated_consultation)

# Verify deletions
assert len(deleted_ids) == 2
assert initial_id in deleted_ids
assert updated_id in deleted_ids

# Verify consultations no longer exist in the database

with session.no_consultation_filter():
assert not session.query(Consultation).count()
# Verify related Status objects have been deleted
assert session.execute(select(Status).filter(
Status.consultation_id.in_(deleted_ids))).first() is None

0 comments on commit bdec6b4

Please sign in to comment.