Skip to content

Commit 5898196

Browse files
tw4likreymer
andauthored
Optionally delay replica deletion (#2252)
Fixes #2170 The number of days to delay file replication deletion by is configurable in the Helm chart with `replica_deletion_delay_days` (set by default to 7 days in `values.yaml` to encourage good practice, though we could change this). When `replica_deletion_delay_days` is set to an int above 0, when a delete replica job would otherwise be started as a Kubernetes Job, a CronJob is created instead with a cron schedule set to run yearly, starting x days from the current moment. This cronjob is then deleted by the operator after the job successfully completes. If a failed background job is retried, it is re-run immediately as a Job rather than being scheduled out into the future again. --------- Co-authored-by: Ilya Kreymer <[email protected]>
1 parent 2060ee7 commit 5898196

File tree

8 files changed

+169
-22
lines changed

8 files changed

+169
-22
lines changed

backend/btrixcloud/background_jobs.py

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ async def handle_replica_job_finished(self, job: CreateReplicaJob) -> None:
9696
if not res:
9797
print("File deleted before replication job started, ignoring", flush=True)
9898

99+
async def handle_delete_replica_job_finished(self, job: DeleteReplicaJob) -> None:
100+
"""After successful replica deletion, delete cronjob if scheduled"""
101+
if job.schedule:
102+
await self.crawl_manager.delete_replica_deletion_scheduled_job(job.id)
103+
99104
async def create_replica_jobs(
100105
self, oid: UUID, file: BaseFile, object_id: str, object_type: str
101106
) -> Dict[str, Union[bool, List[str]]]:
@@ -146,7 +151,7 @@ async def create_replica_job(
146151
job_type = BgJobType.CREATE_REPLICA.value
147152

148153
try:
149-
job_id = await self.crawl_manager.run_replica_job(
154+
job_id, _ = await self.crawl_manager.run_replica_job(
150155
oid=str(org.id),
151156
job_type=job_type,
152157
primary_storage=file.storage,
@@ -155,7 +160,7 @@ async def create_replica_job(
155160
replica_storage=replica_ref,
156161
replica_file_path=replica_file_path,
157162
replica_endpoint=replica_endpoint,
158-
job_id_prefix=f"{job_type}-{object_id}",
163+
delay_days=0,
159164
existing_job_id=existing_job_id,
160165
)
161166
if existing_job_id:
@@ -188,9 +193,13 @@ async def create_replica_job(
188193
)
189194

190195
return job_id
196+
# pylint: disable=broad-exception-caught
191197
except Exception as exc:
192-
# pylint: disable=raise-missing-from
193-
raise HTTPException(status_code=500, detail=f"Error starting crawl: {exc}")
198+
print(
199+
"warning: replica job could not be started "
200+
+ f"for {object_type} {file}: {exc}"
201+
)
202+
return ""
194203

195204
async def create_delete_replica_jobs(
196205
self, org: Organization, file: BaseFile, object_id: str, object_type: str
@@ -214,8 +223,9 @@ async def create_delete_replica_job(
214223
object_id: str,
215224
object_type: str,
216225
replica_ref: StorageRef,
226+
force_start_immediately: bool = False,
217227
existing_job_id: Optional[str] = None,
218-
) -> Optional[str]:
228+
) -> str:
219229
"""Create a job to delete one replica of a given file"""
220230
try:
221231
replica_storage = self.storage_ops.get_org_storage_by_ref(org, replica_ref)
@@ -226,20 +236,23 @@ async def create_delete_replica_job(
226236

227237
job_type = BgJobType.DELETE_REPLICA.value
228238

229-
job_id = await self.crawl_manager.run_replica_job(
239+
delay_days = int(os.environ.get("REPLICA_DELETION_DELAY_DAYS", 0))
240+
if force_start_immediately:
241+
delay_days = 0
242+
243+
job_id, schedule = await self.crawl_manager.run_replica_job(
230244
oid=str(org.id),
231245
job_type=job_type,
232246
replica_storage=replica_ref,
233247
replica_file_path=replica_file_path,
234248
replica_endpoint=replica_endpoint,
235-
job_id_prefix=f"{job_type}-{object_id}",
249+
delay_days=delay_days,
236250
existing_job_id=existing_job_id,
237251
)
238252

239253
if existing_job_id:
240-
delete_replica_job = await self.get_background_job(
241-
existing_job_id, org.id
242-
)
254+
job = await self.get_background_job(existing_job_id, org.id)
255+
delete_replica_job = cast(DeleteReplicaJob, job)
243256
previous_attempt = {
244257
"started": delete_replica_job.started,
245258
"finished": delete_replica_job.finished,
@@ -251,6 +264,7 @@ async def create_delete_replica_job(
251264
delete_replica_job.started = dt_now()
252265
delete_replica_job.finished = None
253266
delete_replica_job.success = None
267+
delete_replica_job.schedule = None
254268
else:
255269
delete_replica_job = DeleteReplicaJob(
256270
id=job_id,
@@ -260,6 +274,7 @@ async def create_delete_replica_job(
260274
object_id=object_id,
261275
object_type=object_type,
262276
replica_storage=replica_ref,
277+
schedule=schedule,
263278
)
264279

265280
await self.jobs.find_one_and_update(
@@ -274,7 +289,7 @@ async def create_delete_replica_job(
274289
"warning: replica deletion job could not be started "
275290
+ f"for {object_type} {file}: {exc}"
276291
)
277-
return None
292+
return ""
278293

279294
async def create_delete_org_job(
280295
self,
@@ -387,6 +402,10 @@ async def job_finished(
387402
if success:
388403
if job_type == BgJobType.CREATE_REPLICA:
389404
await self.handle_replica_job_finished(cast(CreateReplicaJob, job))
405+
if job_type == BgJobType.DELETE_REPLICA:
406+
await self.handle_delete_replica_job_finished(
407+
cast(DeleteReplicaJob, job)
408+
)
390409
else:
391410
print(
392411
f"Background job {job.id} failed, sending email to superuser",
@@ -560,6 +579,7 @@ async def retry_background_job(
560579
job.object_id,
561580
job.object_type,
562581
job.replica_storage,
582+
force_start_immediately=True,
563583
existing_job_id=job_id,
564584
)
565585

backend/btrixcloud/crawlmanager.py

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import os
44
import secrets
55

6-
from typing import Optional, Dict
6+
from typing import Optional, Dict, Tuple
77
from datetime import timedelta
88

99
from fastapi import HTTPException
@@ -72,24 +72,21 @@ async def run_replica_job(
7272
replica_storage: StorageRef,
7373
replica_file_path: str,
7474
replica_endpoint: str,
75+
delay_days: int = 0,
7576
primary_storage: Optional[StorageRef] = None,
7677
primary_file_path: Optional[str] = None,
7778
primary_endpoint: Optional[str] = None,
78-
job_id_prefix: Optional[str] = None,
7979
existing_job_id: Optional[str] = None,
80-
):
80+
) -> Tuple[str, Optional[str]]:
8181
"""run job to replicate file from primary storage to replica storage"""
8282

8383
if existing_job_id:
8484
job_id = existing_job_id
8585
else:
86-
if not job_id_prefix:
87-
job_id_prefix = job_type
86+
# Keep name shorter than in past to avoid k8s issues with length
87+
job_id = f"{job_type}-{secrets.token_hex(5)}"
8888

89-
# ensure name is <=63 characters
90-
job_id = f"{job_id_prefix[:52]}-{secrets.token_hex(5)}"
91-
92-
params = {
89+
params: Dict[str, object] = {
9390
"id": job_id,
9491
"oid": oid,
9592
"job_type": job_type,
@@ -106,11 +103,17 @@ async def run_replica_job(
106103
"BgJobType": BgJobType,
107104
}
108105

106+
if job_type == BgJobType.DELETE_REPLICA.value and delay_days > 0:
107+
# If replica deletion delay is configured, schedule as cronjob
108+
return await self.create_replica_deletion_scheduled_job(
109+
job_id, params, delay_days
110+
)
111+
109112
data = self.templates.env.get_template("replica_job.yaml").render(params)
110113

111114
await self.create_from_yaml(data)
112115

113-
return job_id
116+
return job_id, None
114117

115118
async def run_delete_org_job(
116119
self,
@@ -393,3 +396,37 @@ async def update_scheduled_job(
393396
await self.create_from_yaml(data, self.namespace)
394397

395398
return cron_job_id
399+
400+
async def create_replica_deletion_scheduled_job(
401+
self,
402+
job_id: str,
403+
params: Dict[str, object],
404+
delay_days: int,
405+
) -> Tuple[str, Optional[str]]:
406+
"""create scheduled job to delay replica file in x days"""
407+
now = dt_now()
408+
run_at = now + timedelta(days=delay_days)
409+
schedule = f"{run_at.minute} {run_at.hour} {run_at.day} {run_at.month} *"
410+
411+
params["schedule"] = schedule
412+
413+
print(f"Replica deletion cron schedule: '{schedule}'", flush=True)
414+
415+
data = self.templates.env.get_template("replica_deletion_cron_job.yaml").render(
416+
params
417+
)
418+
419+
await self.create_from_yaml(data, self.namespace)
420+
421+
return job_id, schedule
422+
423+
async def delete_replica_deletion_scheduled_job(self, job_id: str):
424+
"""delete scheduled job to delay replica file in x days"""
425+
cron_job = await self.batch_api.read_namespaced_cron_job(
426+
name=job_id,
427+
namespace=self.namespace,
428+
)
429+
if cron_job:
430+
await self.batch_api.delete_namespaced_cron_job(
431+
name=cron_job.metadata.name, namespace=self.namespace
432+
)

backend/btrixcloud/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2058,6 +2058,7 @@ class DeleteReplicaJob(BackgroundJob):
20582058
object_type: str
20592059
object_id: str
20602060
replica_storage: StorageRef
2061+
schedule: Optional[str] = None
20612062

20622063

20632064
# ============================================================================

backend/btrixcloud/operator/bgjobs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ async def finalize_background_job(self, data: MCDecoratorSyncData) -> dict:
3535
labels: dict[str, str] = metadata.get("labels", {})
3636
oid: str = labels.get("btrix.org") or ""
3737
job_type: str = labels.get("job_type") or ""
38-
job_id: str = metadata.get("name")
38+
job_id: str = labels.get("job_id") or metadata.get("name")
3939

4040
status = data.object["status"]
4141
success = status.get("succeeded") == 1
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
apiVersion: batch/v1
2+
kind: CronJob
3+
metadata:
4+
name: "{{ id }}"
5+
labels:
6+
role: "cron-background-job"
7+
job_type: {{ job_type }}
8+
btrix.org: {{ oid }}
9+
10+
spec:
11+
concurrencyPolicy: Forbid
12+
successfulJobsHistoryLimit: 0
13+
failedJobsHistoryLimit: 2
14+
15+
schedule: "{{ schedule }}"
16+
17+
jobTemplate:
18+
metadata:
19+
labels:
20+
role: "background-job"
21+
job_type: {{ job_type }}
22+
job_id: {{ id }}
23+
btrix.org: {{ oid }}
24+
25+
spec:
26+
template:
27+
spec:
28+
restartPolicy: Never
29+
priorityClassName: bg-job
30+
podFailurePolicy:
31+
rules:
32+
- action: FailJob
33+
onExitCodes:
34+
containerName: rclone
35+
operator: NotIn
36+
values: [0]
37+
38+
containers:
39+
- name: rclone
40+
image: rclone/rclone:latest
41+
env:
42+
43+
- name: RCLONE_CONFIG_REPLICA_TYPE
44+
value: "s3"
45+
46+
- name: RCLONE_CONFIG_REPLICA_ACCESS_KEY_ID
47+
valueFrom:
48+
secretKeyRef:
49+
name: "{{ replica_secret_name }}"
50+
key: STORE_ACCESS_KEY
51+
52+
- name: RCLONE_CONFIG_REPLICA_SECRET_ACCESS_KEY
53+
valueFrom:
54+
secretKeyRef:
55+
name: "{{ replica_secret_name }}"
56+
key: STORE_SECRET_KEY
57+
58+
- name: RCLONE_CONFIG_REPLICA_REGION
59+
valueFrom:
60+
secretKeyRef:
61+
name: "{{ replica_secret_name }}"
62+
key: STORE_REGION
63+
64+
- name: RCLONE_CONFIG_REPLICA_PROVIDER
65+
valueFrom:
66+
secretKeyRef:
67+
name: "{{ replica_secret_name }}"
68+
key: STORE_S3_PROVIDER
69+
70+
- name: RCLONE_CONFIG_REPLICA_ENDPOINT
71+
value: "{{ replica_endpoint }}"
72+
73+
command: ["rclone", "-vv", "delete", "replica:{{ replica_file_path }}"]
74+
75+
resources:
76+
limits:
77+
memory: "200Mi"
78+
79+
requests:
80+
memory: "200Mi"
81+
cpu: "50m"

chart/templates/configmap.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ data:
8585

8686
LOCALES_ENABLED: "{{ .Values.locales_enabled }}"
8787

88+
REPLICA_DELETION_DELAY_DAYS: "{{ .Values.replica_deletion_delay_days | default 0 }}"
89+
8890

8991
---
9092
apiVersion: v1

chart/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ superuser:
9696
# Set name for default organization created with superuser
9797
default_org: "My Organization"
9898

99+
# Set number of days replica file deletion should be delayed by
100+
# if set >0, will keep replicas (if any) for this number of days
101+
replica_deletion_delay_days: 0
102+
99103

100104
# API Image
101105
# =========================================

frontend/docs/docs/deploy/customization.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ storages:
133133
access_endpoint_url: "https://my-custom-domain.example.com/path/"
134134
```
135135

136+
When replica locations are set, the default behavior when a crawl, upload, or browser profile is deleted is that the replica files are deleted at the same time as the file in primary storage. To delay deletion of replicas, set `replica_deletion_delay_days` in the Helm chart to the number of days by which to delay replica file deletion. This feature gives Browsertrix administrators time in the event of files being deleted accidentally or maliciously to recover copies from configured replica locations.
137+
136138
## Horizontal Autoscaling
137139

138140
Browsertrix also includes support for horizontal auto-scaling for both the backend and frontend pods.

0 commit comments

Comments
 (0)