|
| 1 | +import logging |
| 2 | +from typing import Any, Dict, List, Optional |
| 3 | + |
| 4 | +from fastapi import HTTPException, status |
| 5 | +from fastapi.encoders import jsonable_encoder |
| 6 | +from sqlalchemy import and_, func |
| 7 | +from sqlalchemy.orm import Session, joinedload |
| 8 | + |
| 9 | +from agr_literature_service.api.models.person_model import PersonModel |
| 10 | +from agr_literature_service.api.models.email_model import EmailModel |
| 11 | +from agr_literature_service.api.models.person_setting_model import PersonSettingModel |
| 12 | +from agr_literature_service.api.crud.user_utils import map_to_user_id |
| 13 | + |
| 14 | +logger = logging.getLogger(__name__) |
| 15 | + |
| 16 | + |
| 17 | +def normalize_email(s: str) -> str: |
| 18 | + return s.strip().lower() |
| 19 | + |
| 20 | + |
| 21 | +def _non_empty_or_422(field: str, value: Optional[str]) -> str: |
| 22 | + v = (value or "").strip() |
| 23 | + if not v: |
| 24 | + raise HTTPException( |
| 25 | + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
| 26 | + detail=f"{field} must be a non-empty string", |
| 27 | + ) |
| 28 | + return v |
| 29 | + |
| 30 | + |
| 31 | +def _assert_person_exists(db: Session, person_id: int) -> None: |
| 32 | + exists = db.query(PersonModel.person_id).filter(PersonModel.person_id == person_id).first() |
| 33 | + if not exists: |
| 34 | + raise HTTPException( |
| 35 | + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
| 36 | + detail=f"person_id {person_id} does not exist", |
| 37 | + ) |
| 38 | + |
| 39 | + |
| 40 | +def _assert_default_unique( |
| 41 | + db: Session, |
| 42 | + person_id: int, |
| 43 | + component_name: str, |
| 44 | + exclude_person_setting_id: Optional[int] = None, |
| 45 | +) -> None: |
| 46 | + """ |
| 47 | + Enforce at-most-one default per (person_id, component_name). |
| 48 | + This mirrors the DB partial unique index at the application layer |
| 49 | + to give a friendlier error before hitting the constraint. |
| 50 | + """ |
| 51 | + q = ( |
| 52 | + db.query(PersonSettingModel.person_setting_id) |
| 53 | + .filter(PersonSettingModel.person_id == person_id) |
| 54 | + .filter(PersonSettingModel.component_name == component_name) |
| 55 | + .filter(PersonSettingModel.default_setting.is_(True)) |
| 56 | + ) |
| 57 | + if exclude_person_setting_id is not None: |
| 58 | + q = q.filter(PersonSettingModel.person_setting_id != exclude_person_setting_id) |
| 59 | + |
| 60 | + existing = q.first() |
| 61 | + if existing: |
| 62 | + raise HTTPException( |
| 63 | + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
| 64 | + detail=( |
| 65 | + "A default setting already exists for this (person_id, component_name). " |
| 66 | + "Unset the existing default or set this row to default_setting = false." |
| 67 | + ), |
| 68 | + ) |
| 69 | + |
| 70 | + |
| 71 | +def create(db: Session, payload) -> PersonSettingModel: |
| 72 | + """ |
| 73 | + Create a PersonSetting row. |
| 74 | + Enforces: |
| 75 | + - person exists |
| 76 | + - non-empty component_name / setting_name |
| 77 | + - only one default per (person_id, component_name) |
| 78 | + """ |
| 79 | + data: Dict[str, Any] = jsonable_encoder(payload) |
| 80 | + |
| 81 | + if "created_by" in data and data["created_by"] is not None: |
| 82 | + data["created_by"] = map_to_user_id(data["created_by"], db) |
| 83 | + if "updated_by" in data and data["updated_by"] is not None: |
| 84 | + data["updated_by"] = map_to_user_id(data["updated_by"], db) |
| 85 | + |
| 86 | + person_id = data.get("person_id") |
| 87 | + if person_id is None: |
| 88 | + raise HTTPException( |
| 89 | + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
| 90 | + detail="person_id is required", |
| 91 | + ) |
| 92 | + _assert_person_exists(db, int(person_id)) |
| 93 | + |
| 94 | + component_name = _non_empty_or_422("component_name", data.get("component_name")) |
| 95 | + setting_name = _non_empty_or_422("setting_name", data.get("setting_name")) |
| 96 | + |
| 97 | + is_default = bool(data.get("default_setting", False)) |
| 98 | + if is_default: |
| 99 | + _assert_default_unique(db, person_id, component_name) |
| 100 | + |
| 101 | + obj = PersonSettingModel( |
| 102 | + person_id=person_id, |
| 103 | + component_name=component_name, |
| 104 | + setting_name=setting_name, |
| 105 | + default_setting=is_default, |
| 106 | + json_settings=data.get("json_settings") or {}, |
| 107 | + created_by=data.get("created_by"), |
| 108 | + updated_by=data.get("updated_by"), |
| 109 | + ) |
| 110 | + db.add(obj) |
| 111 | + db.commit() |
| 112 | + db.refresh(obj) |
| 113 | + return obj |
| 114 | + |
| 115 | + |
| 116 | +def destroy(db: Session, person_setting_id: int) -> None: |
| 117 | + obj: Optional[PersonSettingModel] = ( |
| 118 | + db.query(PersonSettingModel) |
| 119 | + .filter(PersonSettingModel.person_setting_id == person_setting_id) |
| 120 | + .first() |
| 121 | + ) |
| 122 | + if not obj: |
| 123 | + raise HTTPException( |
| 124 | + status_code=status.HTTP_404_NOT_FOUND, |
| 125 | + detail=f"person_setting_id {person_setting_id} not found", |
| 126 | + ) |
| 127 | + db.delete(obj) |
| 128 | + db.commit() |
| 129 | + |
| 130 | + |
| 131 | +def patch(db: Session, person_setting_id: int, patch_dict: Dict[str, Any]) -> Dict[str, Any]: |
| 132 | + obj: Optional[PersonSettingModel] = ( |
| 133 | + db.query(PersonSettingModel) |
| 134 | + .filter(PersonSettingModel.person_setting_id == person_setting_id) |
| 135 | + .first() |
| 136 | + ) |
| 137 | + if not obj: |
| 138 | + raise HTTPException( |
| 139 | + status_code=status.HTTP_404_NOT_FOUND, |
| 140 | + detail=f"person_setting_id {person_setting_id} not found", |
| 141 | + ) |
| 142 | + |
| 143 | + data = jsonable_encoder(patch_dict) |
| 144 | + |
| 145 | + if "created_by" in data and data["created_by"] is not None: |
| 146 | + data["created_by"] = map_to_user_id(data["created_by"], db) |
| 147 | + if "updated_by" in data and data["updated_by"] is not None: |
| 148 | + data["updated_by"] = map_to_user_id(data["updated_by"], db) |
| 149 | + |
| 150 | + # Prepare new target values for uniqueness check |
| 151 | + new_person_id = int(data.get("person_id", obj.person_id)) |
| 152 | + new_component_name = data.get("component_name", obj.component_name) |
| 153 | + new_default_setting = data.get("default_setting", obj.default_setting) |
| 154 | + |
| 155 | + if "person_id" in data and data["person_id"] is not None: |
| 156 | + _assert_person_exists(db, new_person_id) |
| 157 | + |
| 158 | + if "component_name" in data and data["component_name"] is not None: |
| 159 | + new_component_name = _non_empty_or_422("component_name", new_component_name) |
| 160 | + |
| 161 | + if "setting_name" in data and data["setting_name"] is not None: |
| 162 | + data["setting_name"] = _non_empty_or_422("setting_name", data["setting_name"]) |
| 163 | + |
| 164 | + # If this row is (or will become) the default, ensure no other default conflicts |
| 165 | + if bool(new_default_setting): |
| 166 | + _assert_default_unique( |
| 167 | + db, |
| 168 | + person_id=new_person_id, |
| 169 | + component_name=new_component_name, |
| 170 | + exclude_person_setting_id=person_setting_id, |
| 171 | + ) |
| 172 | + |
| 173 | + # Only update scalar fields defined on the table |
| 174 | + ALLOWED = {"person_id", "component_name", "setting_name", "default_setting", "json_settings"} |
| 175 | + for field, value in data.items(): |
| 176 | + if field in ALLOWED: |
| 177 | + setattr(obj, field, value) |
| 178 | + |
| 179 | + db.commit() |
| 180 | + return {"message": "updated"} |
| 181 | + |
| 182 | + |
| 183 | +def show(db: Session, person_setting_id: int) -> PersonSettingModel: |
| 184 | + obj: Optional[PersonSettingModel] = ( |
| 185 | + db.query(PersonSettingModel) |
| 186 | + .options(joinedload(PersonSettingModel.person)) |
| 187 | + .filter(PersonSettingModel.person_setting_id == person_setting_id) |
| 188 | + .first() |
| 189 | + ) |
| 190 | + if not obj: |
| 191 | + raise HTTPException( |
| 192 | + status_code=status.HTTP_404_NOT_FOUND, |
| 193 | + detail=f"person_setting_id {person_setting_id} not found", |
| 194 | + ) |
| 195 | + return obj |
| 196 | + |
| 197 | + |
| 198 | +# ---------- Lookup helpers used by router ---------- |
| 199 | + |
| 200 | +def get_by_okta_id(db: Session, okta_id: str) -> List[PersonSettingModel]: |
| 201 | + if not okta_id: |
| 202 | + return [] |
| 203 | + return ( |
| 204 | + db.query(PersonSettingModel) |
| 205 | + .join(PersonModel, PersonModel.person_id == PersonSettingModel.person_id) |
| 206 | + .options(joinedload(PersonSettingModel.person)) |
| 207 | + .filter(PersonModel.okta_id == okta_id) |
| 208 | + .order_by(PersonSettingModel.component_name.asc(), PersonSettingModel.setting_name.asc()) |
| 209 | + .all() |
| 210 | + ) |
| 211 | + |
| 212 | + |
| 213 | +def get_by_email(db: Session, email: str) -> List[PersonSettingModel]: |
| 214 | + if not email: |
| 215 | + return [] |
| 216 | + email_norm = normalize_email(email) |
| 217 | + return ( |
| 218 | + db.query(PersonSettingModel) |
| 219 | + .join(PersonModel, PersonModel.person_id == PersonSettingModel.person_id) |
| 220 | + .join(EmailModel, and_(EmailModel.person_id == PersonModel.person_id)) |
| 221 | + .options(joinedload(PersonSettingModel.person)) |
| 222 | + .filter(func.lower(EmailModel.email_address) == email_norm) |
| 223 | + .order_by(PersonSettingModel.component_name.asc(), PersonSettingModel.setting_name.asc()) |
| 224 | + .all() |
| 225 | + ) |
| 226 | + |
| 227 | + |
| 228 | +def find_by_name(db: Session, name: str) -> List[PersonSettingModel]: |
| 229 | + """ |
| 230 | + Case-insensitive partial match on Person.display_name. |
| 231 | + """ |
| 232 | + if not name: |
| 233 | + return [] |
| 234 | + pattern = f"%{name.strip()}%" |
| 235 | + return ( |
| 236 | + db.query(PersonSettingModel) |
| 237 | + .join(PersonModel, PersonModel.person_id == PersonSettingModel.person_id) |
| 238 | + .options(joinedload(PersonSettingModel.person)) |
| 239 | + .filter(PersonModel.display_name.ilike(pattern)) |
| 240 | + .order_by(PersonModel.display_name.asc(), PersonSettingModel.component_name.asc(), PersonSettingModel.setting_name.asc()) |
| 241 | + .all() |
| 242 | + ) |
0 commit comments