Skip to content

Commit 69db25b

Browse files
authored
Merge pull request #51 from CSSS/dev-issue-12
Exam Bank Watermarking
2 parents 6295017 + 686a4e4 commit 69db25b

File tree

2 files changed

+145
-11
lines changed

2 files changed

+145
-11
lines changed

src/exambank/watermark.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
from io import BytesIO
2+
from pathlib import Path
3+
4+
import pymupdf
5+
from pypdf import PdfReader, PdfWriter
6+
from reportlab.lib import colors
7+
from reportlab.lib.pagesizes import A4
8+
from reportlab.lib.units import inch
9+
from reportlab.pdfgen import canvas
10+
11+
# param to describe margin for exam generation text
12+
BORDER = 20
13+
14+
def create_watermark(
15+
computing_id: str,
16+
density: int = 5
17+
) -> BytesIO:
18+
"""
19+
Returns a PDF with one page containing the watermark as text.
20+
"""
21+
# Generate the tiling watermark
22+
stamp_buffer = BytesIO()
23+
stamp_pdf = canvas.Canvas(stamp_buffer, pagesize=A4)
24+
stamp_pdf.translate(density / 3 * inch, -density / 3 * inch)
25+
stamp_pdf.setFillColor(colors.grey, alpha=0.5)
26+
stamp_pdf.setFont("Helvetica", 100 / density)
27+
stamp_pdf.rotate(45)
28+
29+
width, height = A4
30+
31+
for i in range(1, 4 * int(width), int(width/density)):
32+
for j in range(1, 4 * int(height), int(height/density)):
33+
stamp_pdf.drawCentredString(i, j, computing_id)
34+
stamp_pdf.save()
35+
stamp_buffer.seek(0)
36+
37+
# Generate the warning text in corner
38+
warning_buffer = BytesIO()
39+
warning_pdf = canvas.Canvas(warning_buffer, pagesize=A4)
40+
warning_pdf.setFillColor(colors.grey, alpha=0.75)
41+
warning_pdf.setFont("Helvetica", 14)
42+
43+
from datetime import datetime
44+
warning_pdf.drawString(BORDER, BORDER, f"This exam was generated by {computing_id} at {datetime.now()}")
45+
46+
warning_pdf.save()
47+
warning_buffer.seek(0)
48+
49+
# Merge into watermark
50+
watermark_pdf = PdfWriter()
51+
stamp_pdf = PdfReader(stamp_buffer)
52+
warning_pdf = PdfReader(warning_buffer)
53+
# Destructively merges in place
54+
stamp_pdf.pages[0].merge_page(warning_pdf.pages[0])
55+
watermark_pdf.add_page(stamp_pdf.pages[0])
56+
57+
watermark_buffer = BytesIO()
58+
watermark_pdf.write(watermark_buffer)
59+
watermark_buffer.seek(0)
60+
return watermark_buffer
61+
62+
def apply_watermark(
63+
pdf_path: Path | str,
64+
# expect a BytesIO instance (at position 0), accept a file/path
65+
stamp: BytesIO | Path | str,
66+
) -> BytesIO:
67+
# process file
68+
stamp_page = PdfReader(stamp).pages[0]
69+
70+
writer = PdfWriter()
71+
reader = PdfReader(pdf_path)
72+
writer.append(reader)
73+
for pdf_page in writer.pages:
74+
pdf_page.transfer_rotation_to_content()
75+
pdf_page.merge_page(stamp_page)
76+
77+
watermarked_pdf = BytesIO()
78+
writer.write(watermarked_pdf)
79+
watermarked_pdf.seek(0)
80+
81+
return watermarked_pdf
82+
83+
def raster_pdf(
84+
pdf_path: BytesIO,
85+
dpi: int = 300
86+
) -> BytesIO:
87+
raster_buffer = BytesIO()
88+
# adapted from https://github.com/pymupdf/PyMuPDF/discussions/1183
89+
with pymupdf.open(stream=pdf_path) as doc:
90+
page_count = doc.page_count
91+
with pymupdf.open() as target:
92+
for page, _dpi in zip(doc, [dpi] * page_count, strict=False):
93+
zoom = _dpi / 72
94+
mat = pymupdf.Matrix(zoom, zoom)
95+
pix = page.get_pixmap(matrix=mat)
96+
tarpage = target.new_page(width=page.rect.width, height=page.rect.height)
97+
tarpage.insert_image(tarpage.rect, stream=pix.pil_tobytes("PNG"))
98+
99+
target.save(raster_buffer)
100+
raster_buffer.seek(0)
101+
return raster_buffer
102+
103+
def raster_pdf_from_path(
104+
pdf_path: Path | str,
105+
dpi: int = 300
106+
) -> BytesIO:
107+
raster_buffer = BytesIO()
108+
# adapted from https://github.com/pymupdf/PyMuPDF/discussions/1183
109+
with pymupdf.open(filename=pdf_path) as doc:
110+
page_count = doc.page_count
111+
with pymupdf.open() as target:
112+
for page, _dpi in zip(doc, [dpi] * page_count, strict=False):
113+
zoom = _dpi / 72
114+
mat = pymupdf.Matrix(zoom, zoom)
115+
pix = page.get_pixmap(matrix=mat)
116+
tarpage = target.new_page(width=page.rect.width, height=page.rect.height)
117+
tarpage.insert_image(tarpage.rect, stream=pix.pil_tobytes("PNG"))
118+
119+
target.save(raster_buffer)
120+
raster_buffer.seek(0)
121+
return raster_buffer

src/github/github.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ async def _github_request_get(
3636
)
3737
rate_limit_remaining = int(result.headers["x-ratelimit-remaining"])
3838
if rate_limit_remaining < 50:
39+
# Less than 50 api calls remaining before being rate limited
3940
return None
4041

4142
return result
@@ -56,6 +57,7 @@ async def _github_request_post(
5657
)
5758
rate_limit_remaining = int(result.headers["x-ratelimit-remaining"])
5859
if rate_limit_remaining < 50:
60+
# Less than 50 api calls remaining before being rate limited, please try again later
5961
return None
6062

6163
return result
@@ -73,6 +75,7 @@ async def _github_request_delete(
7375
)
7476
rate_limit_remaining = int(result.headers["x-ratelimit-remaining"])
7577
if rate_limit_remaining < 50:
78+
# Less than 50 api calls remaining before being rate limited
7679
return None
7780

7881
return result
@@ -92,39 +95,46 @@ async def _github_request_put(
9295
)
9396
rate_limit_remaining = int(result.headers["x-ratelimit-remaining"])
9497
if rate_limit_remaining < 50:
98+
# Less than 50 api calls remaining before being rate limited
9599
return None
96100

97101
return result
98102

99103
async def get_user_by_username(
100104
username: str
101-
) -> GithubUser:
105+
) -> GithubUser | None:
102106
"""
103107
Takes in a Github username and returns an instance of GithubUser.
104108
105-
May return None if no such user was found.
109+
Returns None if no such user was found.
106110
"""
107-
result = await _github_request_get(f"https://api.github.com/users/{username}",
108-
os.environ.get("GITHUB_TOKEN"))
111+
result = await _github_request_get(
112+
f"https://api.github.com/users/{username}",
113+
os.environ.get("GITHUB_TOKEN"),
114+
)
109115
result_json = result.json()
110116
if result_json["status"] == "404":
111117
return None
112-
return GithubUser(result_json["login"], result_json["id"], result_json["name"])
118+
else:
119+
return GithubUser(result_json["login"], result_json["id"], result_json["name"])
113120

114121
async def get_user_by_id(
115122
uid: str
116-
) -> GithubUser:
123+
) -> GithubUser | None:
117124
"""
118125
Takes in a Github user id and returns an instance of GithubUser.
119126
120-
May return None if no such user was found.
127+
Returns None if no such user was found.
121128
"""
122-
result = await _github_request_get(f"https://api.github.com/user/{uid}",
123-
os.environ.get("GITHUB_TOKEN"))
129+
result = await _github_request_get(
130+
f"https://api.github.com/user/{uid}",
131+
os.environ.get("GITHUB_TOKEN"),
132+
)
124133
result_json = result.json()
125134
if result_json["status"] == "404":
126135
return None
127-
return GithubUser(result_json["login"], result_json["id"], result_json["name"])
136+
else:
137+
return GithubUser(result_json["login"], result_json["id"], result_json["name"])
128138

129139
async def add_user_to_org(
130140
org: str = github_org_name,
@@ -151,7 +161,10 @@ async def add_user_to_org(
151161
result_json = result.json()
152162
# Logging here potentially?
153163
if result.status_code != 201:
154-
raise Exception(f"Status code {result.status_code} returned when attempting to add user to org: {result_json['message']}: {[error['message'] for error in result_json['errors']]}")
164+
raise Exception(
165+
f"Status code {result.status_code} returned when attempting to add user to org: "
166+
f"{result_json['message']}: {[error['message'] for error in result_json['errors']]}"
167+
)
155168

156169
async def delete_user_from_org(
157170
username: str,

0 commit comments

Comments
 (0)