Skip to content

Commit

Permalink
Rework crawl page migration + MongoDB Query Optimizations (#2412)
Browse files Browse the repository at this point in the history
Fixes #2406 

Converts migration 0042 to launch a background job (parallelized across
several pods) to migrate all crawls by optimizing their pages and
setting `version: 2` on the crawl when complete.

Also Optimizes MongoDB queries for better performance.

Migration Improvements:

- Add `isMigrating` and `version` fields to `BaseCrawl`
- Add new background job type to use in migration with accompanying
`migration_job.yaml` template that allows for parallelization
- Add new API endpoint to launch this crawl migration job, and ensure
that we have list and retry endpoints for superusers that work with
background jobs that aren't tied to a specific org
- Rework background job models and methods now that not all background
jobs are tied to a single org
- Ensure new crawls and uploads have `version` set to `2`
- Modify crawl and collection replay.json endpoints to only include
fields for replay optimization (`initialPages`, `pageQueryUrl`,
`preloadResources`) if all relevant crawls/uploads have `version` set to
`2`
- Remove `distinct` calls from migration pathways
- Consolidate collection recompute stats

Query Optimizations:
- Remove all uses of $group and $facet
- Optimize /replay.json endpoints to precompute preload_resources, avoid
fetching crawl list twice
- Optimize /collections endpoint by not fetching resources 
- Rename /urls -> /pageUrlCounts and avoid $group, instead sort with
index, either by seed + ts or by url to get top matches.
- Use $gte instead of $regex to get prefix matches on URL
- Use $text instead of $regex to get text search on title
- Remove total from /pages and /pageUrlCounts queries by not using
$facet
- frontend: only call /pageUrlCounts when dialog is opened.


---------

Co-authored-by: Ilya Kreymer <[email protected]>
Co-authored-by: Emma Segal-Grossman <[email protected]>
Co-authored-by: Ilya Kreymer <[email protected]>
  • Loading branch information
4 people authored Feb 20, 2025
1 parent f7cd476 commit f8fb2d2
Show file tree
Hide file tree
Showing 31 changed files with 790 additions and 526 deletions.
193 changes: 167 additions & 26 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
DeleteOrgJob,
RecalculateOrgStatsJob,
ReAddOrgPagesJob,
OptimizePagesJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
User,
SuccessResponse,
SuccessResponseId,
)
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
from .utils import dt_now
Expand All @@ -52,6 +54,8 @@ class BackgroundJobOps:
base_crawl_ops: BaseCrawlOps
profile_ops: ProfileOps

migration_jobs_scale: int

# pylint: disable=too-many-locals, too-many-arguments, invalid-name

def __init__(self, mdb, email, user_manager, org_ops, crawl_manager, storage_ops):
Expand All @@ -67,6 +71,8 @@ def __init__(self, mdb, email, user_manager, org_ops, crawl_manager, storage_ops
self.base_crawl_ops = cast(BaseCrawlOps, None)
self.profile_ops = cast(ProfileOps, None)

self.migration_jobs_scale = int(os.environ.get("MIGRATION_JOBS_SCALE", 1))

self.router = APIRouter(
prefix="/jobs",
tags=["jobs"],
Expand Down Expand Up @@ -382,6 +388,7 @@ async def create_re_add_org_pages_job(
self,
oid: UUID,
crawl_type: Optional[str] = None,
crawl_id: Optional[str] = None,
existing_job_id: Optional[str] = None,
):
"""Create job to (re)add all pages in an org, optionally filtered by crawl type"""
Expand All @@ -390,6 +397,7 @@ async def create_re_add_org_pages_job(
job_id = await self.crawl_manager.run_re_add_org_pages_job(
oid=str(oid),
crawl_type=crawl_type,
crawl_id=crawl_id,
existing_job_id=existing_job_id,
)
if existing_job_id:
Expand All @@ -410,6 +418,7 @@ async def create_re_add_org_pages_job(
id=job_id,
oid=oid,
crawl_type=crawl_type,
crawl_id=crawl_id,
started=dt_now(),
)

Expand All @@ -424,18 +433,58 @@ async def create_re_add_org_pages_job(
print(f"warning: re-add org pages job could not be started: {exc}")
return None

async def create_optimize_crawl_pages_job(
self,
existing_job_id: Optional[str] = None,
):
"""Create job to optimize crawl pages"""

try:
job_id = await self.crawl_manager.run_optimize_pages_job(
existing_job_id=existing_job_id, scale=self.migration_jobs_scale
)
if existing_job_id:
optimize_pages_job = await self.get_background_job(existing_job_id)
previous_attempt = {
"started": optimize_pages_job.started,
"finished": optimize_pages_job.finished,
}
if optimize_pages_job.previousAttempts:
optimize_pages_job.previousAttempts.append(previous_attempt)
else:
optimize_pages_job.previousAttempts = [previous_attempt]
optimize_pages_job.started = dt_now()
optimize_pages_job.finished = None
optimize_pages_job.success = None
else:
optimize_pages_job = OptimizePagesJob(
id=job_id,
started=dt_now(),
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": optimize_pages_job.to_dict()}, upsert=True
)

return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: optimize pages job could not be started: {exc}")
return None

async def job_finished(
self,
job_id: str,
job_type: str,
oid: UUID,
success: bool,
finished: datetime,
oid: Optional[UUID] = None,
) -> None:
"""Update job as finished, including
job-specific task handling"""

job = await self.get_background_job(job_id, oid)
job = await self.get_background_job(job_id)
if job.finished:
return

Expand All @@ -455,14 +504,16 @@ async def job_finished(
flush=True,
)
superuser = await self.user_manager.get_superuser()
org = await self.org_ops.get_org_by_id(job.oid)
org = None
if job.oid:
org = await self.org_ops.get_org_by_id(job.oid)
await asyncio.get_event_loop().run_in_executor(
None,
self.email.send_background_job_failed,
job,
org,
finished,
superuser.email,
org,
)

await self.jobs.find_one_and_update(
Expand All @@ -478,6 +529,7 @@ async def get_background_job(
DeleteOrgJob,
RecalculateOrgStatsJob,
ReAddOrgPagesJob,
OptimizePagesJob,
]:
"""Get background job"""
query: dict[str, object] = {"_id": job_id}
Expand All @@ -504,11 +556,14 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):
if data["type"] == BgJobType.READD_ORG_PAGES:
return ReAddOrgPagesJob.from_dict(data)

if data["type"] == BgJobType.OPTIMIZE_PAGES:
return OptimizePagesJob.from_dict(data)

return DeleteOrgJob.from_dict(data)

async def list_background_jobs(
self,
org: Organization,
org: Optional[Organization] = None,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
Expand All @@ -522,7 +577,10 @@ async def list_background_jobs(
page = page - 1
skip = page_size * page

query: dict[str, object] = {"oid": org.id}
query: dict[str, object] = {}

if org:
query["oid"] = org.id

if success in (True, False):
query["success"] = success
Expand Down Expand Up @@ -590,10 +648,10 @@ async def get_replica_job_file(
raise HTTPException(status_code=404, detail="file_not_found")

async def retry_background_job(
self, job_id: str, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
self, job_id: str, org: Optional[Organization] = None
):
"""Retry background job"""
job = await self.get_background_job(job_id, org.id)
job = await self.get_background_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="job_not_found")

Expand All @@ -603,7 +661,23 @@ async def retry_background_job(
if job.success:
raise HTTPException(status_code=400, detail="job_already_succeeded")

if org:
return await self.retry_org_background_job(job, org)

if job.type == BgJobType.OPTIMIZE_PAGES:
await self.create_optimize_crawl_pages_job(
existing_job_id=job_id,
)
return {"success": True}

return {"success": False}

async def retry_org_background_job(
self, job: BackgroundJob, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry background job specific to one org"""
if job.type == BgJobType.CREATE_REPLICA:
job = cast(CreateReplicaJob, job)
file = await self.get_replica_job_file(job, org)
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
primary_endpoint, bucket_suffix = self.strip_bucket(
Expand All @@ -618,10 +692,12 @@ async def retry_background_job(
job.replica_storage,
primary_file_path,
primary_endpoint,
existing_job_id=job_id,
existing_job_id=job.id,
)
return {"success": True}

if job.type == BgJobType.DELETE_REPLICA:
job = cast(DeleteReplicaJob, job)
file = await self.get_replica_job_file(job, org)
await self.create_delete_replica_job(
org,
Expand All @@ -630,31 +706,39 @@ async def retry_background_job(
job.object_type,
job.replica_storage,
force_start_immediately=True,
existing_job_id=job_id,
existing_job_id=job.id,
)
return {"success": True}

if job.type == BgJobType.DELETE_ORG:
job = cast(DeleteOrgJob, job)
await self.create_delete_org_job(
org,
existing_job_id=job_id,
existing_job_id=job.id,
)
return {"success": True}

if job.type == BgJobType.RECALCULATE_ORG_STATS:
job = cast(RecalculateOrgStatsJob, job)
await self.create_recalculate_org_stats_job(
org,
existing_job_id=job_id,
existing_job_id=job.id,
)
return {"success": True}

if job.type == BgJobType.READD_ORG_PAGES:
job = cast(ReAddOrgPagesJob, job)
await self.create_re_add_org_pages_job(
org.id,
job.crawl_type,
existing_job_id=job_id,
job.crawl_id,
existing_job_id=job.id,
)
return {"success": True}

return {"success": True}
return {"success": False}

async def retry_failed_background_jobs(
async def retry_failed_org_background_jobs(
self, org: Organization
) -> Dict[str, Union[bool, Optional[str]]]:
"""Retry all failed background jobs in an org
Expand All @@ -679,7 +763,9 @@ async def retry_all_failed_background_jobs(
"""
bg_tasks = set()
async for job in self.jobs.find({"success": False}):
org = await self.org_ops.get_org_by_id(job["oid"])
org = None
if job.get("oid"):
org = await self.org_ops.get_org_by_id(job["oid"])
task = asyncio.create_task(self.retry_background_job(job["_id"], org))
bg_tasks.add(task)
task.add_done_callback(bg_tasks.discard)
Expand Down Expand Up @@ -707,23 +793,51 @@ def init_background_jobs_api(
"/{job_id}",
response_model=AnyJob,
)
async def get_background_job(
async def get_org_background_job(
job_id: str,
org: Organization = Depends(org_crawl_dep),
):
"""Retrieve information for background job"""
return await ops.get_background_job(job_id, org.id)

@app.get("/orgs/all/jobs/{job_id}", response_model=SuccessResponse, tags=["jobs"])
@app.get("/orgs/all/jobs/{job_id}", response_model=AnyJob, tags=["jobs"])
async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)):
"""Get background job from any org"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

return await ops.get_background_job(job_id)

@router.post("/{job_id}/retry", response_model=SuccessResponse)
async def retry_background_job(
@app.post(
"/orgs/all/jobs/{job_id}/retry", response_model=SuccessResponse, tags=["jobs"]
)
async def retry_background_job_no_org(job_id: str, user: User = Depends(user_dep)):
"""Retry backgound job that doesn't belong to an org, e.g. migration job"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

job = await ops.get_background_job(job_id)

org = None
if job.oid:
org = await ops.org_ops.get_org_by_id(job.oid)

return await ops.retry_background_job(job_id, org)

@app.post(
"/orgs/all/jobs/migrateCrawls", response_model=SuccessResponseId, tags=["jobs"]
)
async def create_migrate_crawls_job(job_id: str, user: User = Depends(user_dep)):
"""Launch background job to migrate all crawls to v2 with optimized pages"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

job_id = await ops.create_optimize_crawl_pages_job()

return {"success": True, "id": job_id}

@router.post("/{job_id}/retry", response_model=SuccessResponse, tags=["jobs"])
async def retry_org_background_job(
job_id: str,
org: Organization = Depends(org_crawl_dep),
):
Expand All @@ -740,14 +854,41 @@ async def retry_all_failed_background_jobs(user: User = Depends(user_dep)):

return await ops.retry_all_failed_background_jobs()

@router.post("/retryFailed", response_model=SuccessResponse)
async def retry_failed_background_jobs(
@router.post("/retryFailed", response_model=SuccessResponse, tags=["jobs"])
async def retry_failed_org_background_jobs(
org: Organization = Depends(org_crawl_dep),
):
"""Retry failed background jobs"""
return await ops.retry_failed_background_jobs(org)
return await ops.retry_failed_org_background_jobs(org)

@app.get(
"/orgs/all/jobs", response_model=PaginatedBackgroundJobResponse, tags=["jobs"]
)
async def list_all_background_jobs(
pageSize: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
jobType: Optional[str] = None,
sortBy: Optional[str] = None,
sortDirection: Optional[int] = -1,
user: User = Depends(user_dep),
):
"""Retrieve paginated list of background jobs"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

jobs, total = await ops.list_background_jobs(
org=None,
page_size=pageSize,
page=page,
success=success,
job_type=jobType,
sort_by=sortBy,
sort_direction=sortDirection,
)
return paginated_format(jobs, total, page, pageSize)

@router.get("", response_model=PaginatedBackgroundJobResponse)
@router.get("", response_model=PaginatedBackgroundJobResponse, tags=["jobs"])
async def list_background_jobs(
org: Organization = Depends(org_crawl_dep),
pageSize: int = DEFAULT_PAGE_SIZE,
Expand All @@ -759,7 +900,7 @@ async def list_background_jobs(
):
"""Retrieve paginated list of background jobs"""
jobs, total = await ops.list_background_jobs(
org,
org=org,
page_size=pageSize,
page=page,
success=success,
Expand Down
Loading

0 comments on commit f8fb2d2

Please sign in to comment.