diff --git a/src/alembic/env.py b/src/alembic/env.py
index 387b0dd..b3338bf 100644
--- a/src/alembic/env.py
+++ b/src/alembic/env.py
@@ -8,6 +8,7 @@
import auth.tables
import blog.tables
import database
+import elections.tables
import officers.tables
from alembic import context
diff --git a/src/alembic/versions/243190df5588_create_election_tables.py b/src/alembic/versions/243190df5588_create_election_tables.py
new file mode 100644
index 0000000..c7197c8
--- /dev/null
+++ b/src/alembic/versions/243190df5588_create_election_tables.py
@@ -0,0 +1,61 @@
+"""create election tables
+
+Revision ID: 243190df5588
+Revises: 43f71e4bd6fc
+Create Date: 2024-08-10 08:32:54.037614
+
+"""
+from collections.abc import Sequence
+from typing import Union
+
+import sqlalchemy as sa
+
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision: str = "243190df5588"
+down_revision: str | None = "2a6ea95342dc"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ "election",
+ sa.Column("slug", sa.String(length=64), nullable=False),
+ sa.Column("name", sa.String(length=64), nullable=False),
+ sa.Column("type", sa.String(length=64), default="general_election"),
+ sa.Column("datetime_start_nominations", sa.DateTime(), nullable=False),
+ sa.Column("datetime_start_voting", sa.DateTime(), nullable=False),
+ sa.Column("datetime_end_voting", sa.DateTime(), nullable=False),
+ sa.Column("survey_link", sa.String(length=300), nullable=True),
+ sa.PrimaryKeyConstraint("slug")
+ )
+ op.create_table(
+ "election_nominee",
+ sa.Column("computing_id", sa.String(length=32), nullable=False),
+ sa.Column("full_name", sa.String(length=64), nullable=False),
+ sa.Column("facebook", sa.String(length=128), nullable=True),
+ sa.Column("instagram", sa.String(length=128), nullable=True),
+ sa.Column("email", sa.String(length=64), nullable=True),
+ sa.Column("discord", sa.String(length=32), nullable=True),
+ sa.Column("discord_id", sa.String(length=32), nullable=True),
+ sa.Column("discord_username", sa.String(length=32), nullable=True),
+ sa.PrimaryKeyConstraint("computing_id")
+ )
+ op.create_table(
+ "nominee_application",
+ sa.Column("computing_id", sa.String(length=32), nullable=False),
+ sa.Column("nominee_election", sa.String(length=32), nullable=False),
+ sa.Column("speech", sa.Text(), nullable=True),
+ sa.Column("position", sa.String(length=64), nullable=False),
+ sa.ForeignKeyConstraint(["computing_id"], ["election_nominee.computing_id"]),
+ sa.ForeignKeyConstraint(["nominee_election"], ["election.slug"]),
+ sa.PrimaryKeyConstraint("computing_id", "nominee_election")
+ )
+
+
+def downgrade() -> None:
+ op.drop_table("nominee_application")
+ op.drop_table("election_nominee")
+ op.drop_table("election")
diff --git a/src/elections/crud.py b/src/elections/crud.py
new file mode 100644
index 0000000..34f264b
--- /dev/null
+++ b/src/elections/crud.py
@@ -0,0 +1,51 @@
+import logging
+
+import sqlalchemy
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from elections.tables import Election
+
+_logger = logging.getLogger(__name__)
+
+async def get_election(db_session: AsyncSession, election_slug: str) -> Election | None:
+ return await db_session.scalar(
+ sqlalchemy
+ .select(Election)
+ .where(Election.slug == election_slug)
+ )
+
+async def create_election(db_session: AsyncSession, election: Election) -> None:
+ """
+ Creates a new election with given parameters.
+ Does not validate if an election _already_ exists
+ """
+ db_session.add(election)
+
+async def delete_election(db_session: AsyncSession, slug: str) -> None:
+ """
+ Deletes a given election by its slug.
+ Does not validate if an election exists
+ """
+ await db_session.execute(
+ sqlalchemy
+ .delete(Election)
+ .where(Election.slug == slug)
+ )
+
+async def update_election(db_session: AsyncSession, new_election: Election) -> bool:
+ """
+ You attempting to change the name or slug will fail. Instead, you must create a new election.
+ """
+ target_slug = new_election.slug
+ target_election = await get_election(db_session, target_slug)
+
+ if target_election is None:
+ return False
+ else:
+ await db_session.execute(
+ sqlalchemy
+ .update(Election)
+ .where(Election.slug == target_slug)
+ .values(new_election.to_update_dict())
+ )
+ return True
diff --git a/src/elections/tables.py b/src/elections/tables.py
new file mode 100644
index 0000000..1125a6d
--- /dev/null
+++ b/src/elections/tables.py
@@ -0,0 +1,96 @@
+from sqlalchemy import (
+ Column,
+ DateTime,
+ ForeignKey,
+ PrimaryKeyConstraint,
+ String,
+ Text,
+)
+
+from constants import (
+ COMPUTING_ID_LEN,
+ DISCORD_ID_LEN,
+ DISCORD_NAME_LEN,
+ DISCORD_NICKNAME_LEN,
+)
+from database import Base
+
+election_types = ["general_election", "by_election", "council_rep_election"]
+
+MAX_ELECTION_NAME = 64
+MAX_ELECTION_SLUG = 64
+
+class Election(Base):
+ __tablename__ = "election"
+
+ # Slugs are unique identifiers
+ slug = Column(String(MAX_ELECTION_SLUG), primary_key=True)
+ name = Column(String(MAX_ELECTION_NAME), nullable=False)
+ type = Column(String(64), default="general_election")
+ datetime_start_nominations = Column(DateTime, nullable=False)
+ datetime_start_voting = Column(DateTime, nullable=False)
+ datetime_end_voting = Column(DateTime, nullable=False)
+ survey_link = Column(String(300))
+
+ def serializable_dict(self) -> dict:
+ return {
+ "slug": self.slug,
+ "name": self.name,
+ "type": self.type,
+
+ "datetime_start_nominations": self.datetime_start_nominations.isoformat(),
+ "datetime_start_voting": self.datetime_start_voting.isoformat(),
+ "datetime_end_voting": self.datetime_end_voting.isoformat(),
+
+ "survey_link": self.survey_link,
+ }
+
+ def public_details(self) -> dict:
+ return {
+ "slug": self.slug,
+ "name": self.name,
+ "type": self.type,
+
+ "datetime_start_nominations": self.datetime_start_nominations.isoformat(),
+ "datetime_start_voting": self.datetime_start_voting.isoformat(),
+ "datetime_end_voting": self.datetime_end_voting.isoformat(),
+ }
+
+ def to_update_dict(self) -> dict:
+ return {
+ "slug": self.slug,
+ "name": self.name,
+ "type": self.type,
+
+ "datetime_start_nominations": self.datetime_start_nominations,
+ "datetime_start_voting": self.datetime_start_voting,
+ "datetime_end_voting": self.datetime_end_voting,
+
+ "survey_link": self.survey_link,
+ }
+
+# Each row represents a nominee of a given election
+class Nominee(Base):
+ __tablename__ = "election_nominee"
+
+ # Previously named sfuid
+ computing_id = Column(String(COMPUTING_ID_LEN), primary_key=True)
+ full_name = Column(String(64), nullable=False)
+ facebook = Column(String(128))
+ instagram = Column(String(128))
+ email = Column(String(64))
+ discord = Column(String(DISCORD_NAME_LEN))
+ discord_id = Column(String(DISCORD_ID_LEN))
+ discord_username = Column(String(DISCORD_NICKNAME_LEN))
+
+class NomineeApplication(Base):
+ __tablename__ = "nominee_application"
+
+ computing_id = Column(ForeignKey("election_nominee.computing_id"), primary_key=True)
+ nominee_election = Column(ForeignKey("election.slug"), primary_key=True)
+ speech = Column(Text)
+ position = Column(String(64), nullable=False)
+
+ __table_args__ = (
+ PrimaryKeyConstraint(computing_id, nominee_election),
+ )
diff --git a/src/elections/urls.py b/src/elections/urls.py
new file mode 100644
index 0000000..4e1520c
--- /dev/null
+++ b/src/elections/urls.py
@@ -0,0 +1,256 @@
+import logging
+import re
+from datetime import datetime
+
+from fastapi import APIRouter, HTTPException, Request, status
+from fastapi.exceptions import RequestValidationError
+from fastapi.responses import JSONResponse
+
+import database
+import elections
+from elections.tables import Election, election_types
+from permission.types import ElectionOfficer, WebsiteAdmin
+from utils.urls import is_logged_in
+
+_logger = logging.getLogger(__name__)
+
+router = APIRouter(
+ prefix="/elections",
+ tags=["elections"],
+)
+
+def _slugify(text: str) -> str:
+ """Creates a unique slug based on text passed in. Assumes non-unicode text."""
+ return re.sub(r"[\W_]+", "-", text.strip().replace("/", "").replace("&", ""))
+
+async def _validate_user(
+ request: Request,
+ db_session: database.DBSession,
+) -> tuple[bool, str, str]:
+ logged_in, session_id, computing_id = await is_logged_in(request, db_session)
+ if not logged_in:
+ return False, None, None
+
+ has_permission = await ElectionOfficer.has_permission(db_session, computing_id)
+ if not has_permission:
+ has_permission = await WebsiteAdmin.has_permission(db_session, computing_id)
+
+ return has_permission, session_id, computing_id
+
+# elections ------------------------------------------------------------- #
+
+@router.get(
+ "/by_name/{name:str}",
+ description="Retrieves the election data for an election by name"
+)
+async def get_election(
+ request: Request,
+ db_session: database.DBSession,
+ name: str,
+):
+ election = await elections.crud.get_election(db_session, _slugify(name))
+ if election is None:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=f"election with slug {_slugify(name)} does not exist"
+ )
+ elif datetime.now() >= election.datetime_start_voting:
+ # after the voting period starts, all election data becomes public
+ return JSONResponse(election.serializable_dict())
+
+ is_valid_user, _, _ = await _validate_user(request, db_session)
+ return JSONResponse(
+ election.serializable_dict()
+ if is_valid_user
+ else election.public_details()
+ )
+
+@router.post(
+ "/by_name/{name:str}",
+ description="Creates an election and places it in the database. Returns election json on success",
+)
+async def create_election(
+ request: Request,
+ db_session: database.DBSession,
+ name: str,
+ election_type: str,
+ datetime_start_nominations: datetime,
+ datetime_start_voting: datetime,
+ datetime_end_voting: datetime,
+ survey_link: str | None,
+):
+ if election_type not in election_types:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=f"unknown election type {election_type}",
+ )
+
+ is_valid_user, _, _ = await _validate_user(request, db_session)
+ if not is_valid_user:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="must have election officer or admin permission",
+ # TODO: is this header actually required?
+ headers={"WWW-Authenticate": "Basic"},
+ )
+ elif len(name) > elections.tables.MAX_ELECTION_NAME:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=f"election name {name} is too long",
+ )
+ elif len(_slugify(name)) > elections.tables.MAX_ELECTION_SLUG:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=f"election slug {_slugify(name)} is too long",
+ )
+ elif await elections.crud.get_election(db_session, _slugify(name)) is not None:
+ # don't overwrite a previous election
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="would overwrite previous election",
+ )
+ elif not (
+ (datetime_start_nominations <= datetime_start_voting)
+ and (datetime_start_voting <= datetime_end_voting)
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="dates must be in order from earliest to latest",
+ )
+
+ await elections.crud.create_election(
+ db_session,
+ Election(
+ slug = _slugify(name),
+ name = name,
+ type = election_type,
+ datetime_start_nominations = datetime_start_nominations,
+ datetime_start_voting = datetime_start_voting,
+ datetime_end_voting = datetime_end_voting,
+ survey_link = survey_link
+ )
+ )
+ await db_session.commit()
+
+ election = await elections.crud.get_election(db_session, _slugify(name))
+ return JSONResponse(election.serializable_dict())
+
+@router.patch(
+ "/by_name/{name:str}",
+ description="""
+ Updates an election in the database.
+
+ Note that this doesn't let you change the name of an election, unless the new
+ name produces the same slug.
+
+ Returns election json on success.
+ """
+)
+async def update_election(
+ request: Request,
+ db_session: database.DBSession,
+ name: str,
+ election_type: str,
+ datetime_start_nominations: datetime,
+ datetime_start_voting: datetime,
+ datetime_end_voting: datetime,
+ survey_link: str | None,
+):
+ is_valid_user, _, _ = await _validate_user(request, db_session)
+ if not is_valid_user:
+ # let's workshop how we actually wanna handle this
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="must have election officer or admin permission",
+ headers={"WWW-Authenticate": "Basic"},
+ )
+ elif not (
+ (datetime_start_nominations <= datetime_start_voting)
+ and (datetime_start_voting <= datetime_end_voting)
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="dates must be in order from earliest to latest",
+ )
+
+ new_election = Election(
+ slug = _slugify(name),
+ name = name,
+ type = election_type,
+ datetime_start_nominations = datetime_start_nominations,
+ datetime_start_voting = datetime_start_voting,
+ datetime_end_voting = datetime_end_voting,
+ survey_link = survey_link
+ )
+ success = await elections.crud.update_election(db_session, new_election)
+ if not success:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=f"election with slug {_slugify(name)} does not exist",
+ )
+ else:
+ await db_session.commit()
+
+ election = await elections.crud.get_election(db_session, _slugify(name))
+ return JSONResponse(election.serializable_dict())
+
+@router.delete(
+ "/by_name/{name:str}",
+ description="Deletes an election from the database. Returns whether the election exists after deletion."
+)
+async def delete_election(
+ request: Request,
+ db_session: database.DBSession,
+ name: str
+):
+ is_valid_user, _, _ = await _validate_user(request, db_session)
+ if not is_valid_user:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="must have election officer permission",
+ # TODO: is this header actually required?
+ headers={"WWW-Authenticate": "Basic"},
+ )
+
+ await elections.crud.delete_election(db_session, _slugify(name))
+ await db_session.commit()
+
+ old_election = await elections.crud.get_election(db_session, _slugify(name))
+ return JSONResponse({"exists": old_election is not None})
+
+# registration ------------------------------------------------------------- #
+
+@router.post(
+ "/register/{name:str}",
+ description="allows a user to register for an election"
+)
+async def register_in_election(
+ request: Request,
+ db_session: database.DBSession,
+ name: str
+):
+ # TODO: associate specific elections officers with specific elections, then don't
+ # allow any elections officer running an election to register for it
+ pass
+
+@router.patch(
+ "/register/{name:str}",
+ description="update your registration for an election"
+)
+async def update_registration(
+ request: Request,
+ db_session: database.DBSession,
+ name: str
+):
+ pass
+
+@router.delete(
+ "/register/{name:str}",
+ description="revoke your registration in the election"
+)
+async def delete_registration(
+ request: Request,
+ db_session: database.DBSession,
+ name: str
+):
+ pass
diff --git a/src/load_test_db.py b/src/load_test_db.py
index b9bc596..21d2797 100644
--- a/src/load_test_db.py
+++ b/src/load_test_db.py
@@ -3,13 +3,17 @@
# python load_test_db.py
import asyncio
-from datetime import date, timedelta
+from datetime import date, datetime, timedelta
import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncSession
+# NOTE: make sure you import from a file in your module which (at least) indirectly contains those
+# tables, or the current python context will not be able to find them & they won't be loaded
from auth.crud import create_user_session, update_site_user
from database import SQLALCHEMY_TEST_DATABASE_URL, Base, DatabaseSessionManager
+from elections.crud import create_election, update_election
+from elections.tables import Election
from officers.constants import OfficerPosition
from officers.crud import create_new_officer_info, create_new_officer_term, update_officer_info, update_officer_term
from officers.tables import OfficerInfo, OfficerTerm
@@ -57,6 +61,9 @@ async def reset_db(engine):
else:
print(f"new tables: {table_list}")
+# ----------------------------------------------------------------- #
+# load db with test data
+
async def load_test_auth_data(db_session: AsyncSession):
await create_user_session(db_session, "temp_id_314", "abc314")
await update_site_user(db_session, "temp_id_314", "www.my_profile_picture_url.ca/test")
@@ -282,12 +289,64 @@ async def load_sysadmin(db_session: AsyncSession):
))
await db_session.commit()
+async def load_test_elections_data(db_session: AsyncSession):
+ print("loading elections data...")
+ await create_election(db_session, Election(
+ slug="test-election-1",
+ name="test election 1",
+ type="general_election",
+ datetime_start_nominations=datetime.now() - timedelta(days=400),
+ datetime_start_voting=datetime.now() - timedelta(days=395, hours=4),
+ datetime_end_voting=datetime.now() - timedelta(days=390, hours=8),
+ survey_link="https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5"
+ ))
+ await update_election(db_session, Election(
+ slug="test-election-1",
+ name="test election 1",
+ type="general_election",
+ datetime_start_nominations=datetime.now() - timedelta(days=400),
+ datetime_start_voting=datetime.now() - timedelta(days=395, hours=4),
+ datetime_end_voting=datetime.now() - timedelta(days=390, hours=8),
+ survey_link="https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5"
+ ))
+ await create_election(db_session, Election(
+ slug="test-election-2",
+ name="test election 2",
+ type="by_election",
+ datetime_start_nominations=datetime.now() - timedelta(days=300),
+ datetime_start_voting=datetime.now() - timedelta(days=295, hours=4),
+ datetime_end_voting=datetime.now() - timedelta(days=290, hours=8),
+ survey_link="https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5 (oh yeah)"
+ ))
+ await create_election(db_session, Election(
+ slug="my-cr-election-3",
+ name="my cr election 3",
+ type="council_rep_election",
+ datetime_start_nominations=datetime.now() - timedelta(days=5),
+ datetime_start_voting=datetime.now() - timedelta(days=1, hours=4),
+ datetime_end_voting=datetime.now() + timedelta(days=5, hours=8),
+ survey_link="https://youtu.be/dQw4w9WgXcQ?si=kZROi2tu-43MXPM5"
+ ))
+ await create_election(db_session, Election(
+ slug="THE-SUPER-GENERAL-ELECTION-friends",
+ name="THE SUPER GENERAL ELECTION & friends",
+ type="general_election",
+ datetime_start_nominations=datetime.now() + timedelta(days=5),
+ datetime_start_voting=datetime.now() + timedelta(days=10, hours=4),
+ datetime_end_voting=datetime.now() + timedelta(days=15, hours=8),
+ survey_link=None
+ ))
+ await db_session.commit()
+
+# ----------------------------------------------------------------- #
+
async def async_main(sessionmanager):
await reset_db(sessionmanager._engine)
async with sessionmanager.session() as db_session:
await load_test_auth_data(db_session)
await load_test_officers_data(db_session)
await load_sysadmin(db_session)
+ await load_test_elections_data(db_session)
if __name__ == "__main__":
response = input(f"This will reset the {SQLALCHEMY_TEST_DATABASE_URL} database, are you okay with this? (y/N): ")
diff --git a/src/main.py b/src/main.py
index f78060a..82cb031 100755
--- a/src/main.py
+++ b/src/main.py
@@ -1,21 +1,53 @@
import logging
+import os
-from fastapi import FastAPI
+from fastapi import FastAPI, Request, status
+from fastapi.encoders import jsonable_encoder
+from fastapi.exceptions import RequestValidationError
+from fastapi.responses import JSONResponse
import auth.urls
import database
+import elections.urls
import officers.urls
import permission.urls
logging.basicConfig(level=logging.DEBUG)
database.setup_database()
-app = FastAPI(lifespan=database.lifespan, title="CSSS Site Backend", root_path="/api")
+_login_link = (
+ "https://cas.sfu.ca/cas/login?service=" + (
+ "http%3A%2F%2Flocalhost%3A8080"
+ if os.environ.get("LOCAL") == "true"
+ else "https%3A%2F%2Fnew.sfucsss.org"
+ ) + "%2Fapi%2Fauth%2Flogin%3Fredirect_path%3D%2Fapi%2Fapi%2Fdocs%2F%26redirect_fragment%3D"
+)
+
+app = FastAPI(
+ lifespan=database.lifespan,
+ title="CSSS Site Backend",
+ description=f'To login, please click here
To logout from CAS click here',
+ root_path="/api"
+)
app.include_router(auth.urls.router)
+app.include_router(elections.urls.router)
app.include_router(officers.urls.router)
app.include_router(permission.urls.router)
@app.get("/")
async def read_root():
return {"message": "Hello! You might be lost, this is actually the sfucsss.org's backend api."}
+
+@app.exception_handler(RequestValidationError)
+async def validation_exception_handler(
+ request: Request,
+ exception: RequestValidationError,
+):
+ return JSONResponse(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ content=jsonable_encoder({
+ "detail": exception.errors(),
+ "body": exception.body,
+ })
+ )
diff --git a/src/officers/urls.py b/src/officers/urls.py
index decdb10..8cb82f3 100755
--- a/src/officers/urls.py
+++ b/src/officers/urls.py
@@ -10,6 +10,7 @@
from officers.tables import OfficerInfo, OfficerTerm
from officers.types import InitialOfficerInfo, OfficerInfoUpload, OfficerTermUpload
from permission.types import OfficerPrivateInfo, WebsiteAdmin
+from utils.urls import logged_in_or_raise
_logger = logging.getLogger(__name__)
@@ -21,7 +22,7 @@
# ---------------------------------------- #
# checks
-async def has_officer_private_info_access(
+async def _has_officer_private_info_access(
request: Request,
db_session: database.DBSession
) -> tuple[None | str, None | str, bool]:
@@ -37,21 +38,6 @@ async def has_officer_private_info_access(
has_private_access = await OfficerPrivateInfo.has_permission(db_session, computing_id)
return session_id, computing_id, has_private_access
-async def logged_in_or_raise(
- request: Request,
- db_session: database.DBSession
-) -> tuple[str, str]:
- """gets the user's computing_id, or raises an exception if the current request is not logged in"""
- session_id = request.cookies.get("session_id", None)
- if session_id is None:
- raise HTTPException(status_code=401)
-
- session_computing_id = await auth.crud.get_computing_id(db_session, session_id)
- if session_computing_id is None:
- raise HTTPException(status_code=401)
-
- return session_id, session_computing_id
-
# ---------------------------------------- #
# endpoints
@@ -64,7 +50,7 @@ async def current_officers(
request: Request,
db_session: database.DBSession,
):
- _, _, has_private_access = await has_officer_private_info_access(request, db_session)
+ _, _, has_private_access = await _has_officer_private_info_access(request, db_session)
current_officers = await officers.crud.current_officers(db_session, has_private_access)
return JSONResponse({
position: [
@@ -85,7 +71,7 @@ async def all_officers(
# and may only be accessed by that officer and executives. All other officer terms are public.
include_future_terms: bool = False,
):
- _, computing_id, has_private_access = await has_officer_private_info_access(request, db_session)
+ _, computing_id, has_private_access = await _has_officer_private_info_access(request, db_session)
if include_future_terms:
is_website_admin = (computing_id is not None) and (await WebsiteAdmin.has_permission(db_session, computing_id))
if not is_website_admin:
diff --git a/src/permission/types.py b/src/permission/types.py
index 659ed32..73939a6 100644
--- a/src/permission/types.py
+++ b/src/permission/types.py
@@ -4,6 +4,8 @@
from fastapi import HTTPException
import database
+import elections.crud
+import officers.constants
import officers.crud
import utils
from data.semesters import step_semesters
@@ -29,6 +31,26 @@ async def has_permission(db_session: database.DBSession, computing_id: str) -> b
return False
+class ElectionOfficer:
+ @staticmethod
+ async def has_permission(db_session: database.DBSession, computing_id: str) -> bool:
+ """
+ An current elections officer has access to all elections, prior elections officers have no access.
+ """
+ officer_terms = await officers.crud.current_officers(db_session, True)
+ current_election_officer = officer_terms.get(
+ officers.constants.OfficerPosition.ELECTIONS_OFFICER
+ )
+ if current_election_officer is not None:
+ for election_officer in current_election_officer[1]:
+ if (
+ election_officer.private_data.computing_id == computing_id
+ and election_officer.is_current_officer
+ ):
+ return True
+
+ return False
+
class WebsiteAdmin:
WEBSITE_ADMIN_POSITIONS: ClassVar[list[OfficerPosition]] = [
OfficerPosition.PRESIDENT,
diff --git a/src/utils.py b/src/utils/__init__.py
similarity index 99%
rename from src/utils.py
rename to src/utils/__init__.py
index acf5ad0..c52bd4c 100644
--- a/src/utils.py
+++ b/src/utils/__init__.py
@@ -68,3 +68,4 @@ def is_valid_phone_number(phone_number: str) -> bool:
def is_valid_email(email: str):
return re.match(r"^[^@]+@[^@]+\.[a-zA-Z]*$", email)
+
diff --git a/src/utils/urls.py b/src/utils/urls.py
new file mode 100644
index 0000000..13acb86
--- /dev/null
+++ b/src/utils/urls.py
@@ -0,0 +1,36 @@
+from fastapi import HTTPException, Request
+
+import auth
+import database
+
+# TODO: move other utils into this module
+
+async def logged_in_or_raise(
+ request: Request,
+ db_session: database.DBSession
+) -> tuple[str, str]:
+ """gets the user's computing_id, or raises an exception if the current request is not logged in"""
+ session_id = request.cookies.get("session_id", None)
+ if session_id is None:
+ raise HTTPException(status_code=401, detail="no session id")
+
+ session_computing_id = await auth.crud.get_computing_id(db_session, session_id)
+ if session_computing_id is None:
+ raise HTTPException(status_code=401, detail="no computing id")
+
+ return session_id, session_computing_id
+
+async def is_logged_in(
+ request: Request,
+ db_session: database.DBSession
+) -> tuple[str | None, str | None]:
+ """gets the user's computing_id, or raises an exception if the current request is not logged in"""
+ session_id = request.cookies.get("session_id", None)
+ if session_id is None:
+ return False, None, None
+
+ session_computing_id = await auth.crud.get_computing_id(db_session, session_id)
+ if session_computing_id is None:
+ return False, None, None
+
+ return True, session_id, session_computing_id