Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate seqvars inhouse rocksdb in worker call (#2069) #2070

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/config/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"*.clear_*": {"queue": "maintenance"},
"*.compute_*": {"queue": "maintenance"},
"*.sync_*": {"queue": "maintenance"},
"*.run_*inhousedbbuild*": {"queue": "maintenance"},
}

# Explicitely set the name of the default queue to default (is celery).
Expand Down
3 changes: 3 additions & 0 deletions backend/config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,9 @@
# Path to database for the worker (base database with sub entries for mehari etc.).
WORKER_DB_PATH = env.str("VARFISH_WORKER_DB_PATH", "/data/varfish-static/data")

# Writeable path to database for the worker (e.g., for in-house data).
WORKER_RW_PATH = env.str("VARFISH_WORKER_RW_PATH", "/data/varfish-dynamic/data")

# Path to executable for worker.
WORKER_EXE_PATH = env.str("VARFISH_WORKER_EXE_PATH", "varfish-server-worker")

Expand Down
34 changes: 34 additions & 0 deletions backend/seqvars/management/commands/buildseqvarsinhousedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Manually (re-)build the seqvars inhouse database."""

import traceback

from django.core.management.base import BaseCommand, CommandError

from seqvars.models.base import SeqvarsInhouseDbBuildBackgroundJob
from seqvars.models.executors import run_seqvarsbuildinhousedbbackgroundjob


class Command(BaseCommand):
#: Help message displayed on the command line.
help = "(Re-) build the seqvars inhouse database."

def handle(self, *_args, **options):
"""Entrypoint from command line"""

try:
self._handle()
except Exception as e:
self.stderr.write(
self.style.ERROR(
"A problem occured (see below).\n\n--- BEGIN TRACEBACK ---\n"
f"{traceback.format_exc()}--- END TRACEBACK ---\n"
)
)
raise CommandError("Could not re-build the seqvars inhouse db.") from e

Comment on lines +15 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix typo and consider using logging instead of stderr.

  1. There's a typo in the error message: "occured" should be "occurred"
  2. Consider using Django's logging framework instead of direct stderr writes for better consistency and configurability.
-                    "A problem occured (see below).\n\n--- BEGIN TRACEBACK ---\n"
+                    "A problem occurred (see below).\n\n--- BEGIN TRACEBACK ---\n"

Consider refactoring to use logging:

import logging

logger = logging.getLogger(__name__)

# In the exception handler:
logger.error("Could not re-build the seqvars inhouse db:\n%s", traceback.format_exc())

def _handle(self):
# Create a new job to execute.
job = SeqvarsInhouseDbBuildBackgroundJob.objects.create_full()
self.stderr.write(self.style.SUCCESS("Executing seqvars inhouse db build job..."))
run_seqvarsbuildinhousedbbackgroundjob(pk=job.pk)
self.stderr.write(self.style.SUCCESS("... done executing job"))
Comment on lines +29 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider enhancing job execution robustness and user feedback.

The current implementation has several areas for improvement:

  1. The job execution is synchronous, which could block the command for long-running operations
  2. There's no progress feedback during execution
  3. Missing validation before starting the job
  4. No cleanup handling in case of failure

Consider implementing these improvements:

 def _handle(self):
+    # Check if any job is already running
+    if SeqvarsInhouseDbBuildBackgroundJob.objects.filter(status="running").exists():
+        raise CommandError("Another build job is already running")
+
     # Create a new job to execute.
     job = SeqvarsInhouseDbBuildBackgroundJob.objects.create_full()
-    self.stderr.write(self.style.SUCCESS("Executing seqvars inhouse db build job..."))
-    run_seqvarsbuildinhousedbbackgroundjob(pk=job.pk)
-    self.stderr.write(self.style.SUCCESS("... done executing job"))
+    try:
+        self.stderr.write(self.style.SUCCESS(f"Started job with ID: {job.pk}"))
+        self.stderr.write("You can monitor the progress in the admin interface or logs")
+        
+        # Option 1: Run asynchronously
+        run_seqvarsbuildinhousedbbackgroundjob.delay(pk=job.pk)
+        
+        # Option 2: Run synchronously with progress
+        # run_seqvarsbuildinhousedbbackgroundjob(pk=job.pk)
+        # while not job.refresh_from_db():
+        #     self.stderr.write(f"Progress: {job.progress}%")
+        #     time.sleep(5)
+    except Exception as e:
+        job.status = "failed"
+        job.error_message = str(e)
+        job.save()
+        raise

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Generated by Django 3.2.25 on 2024-10-30 07:38

import uuid

import bgjobs.models
from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
("bgjobs", "0006_auto_20200526_1657"),
("seqvars", "0014_seqvarsquerypresetsset_is_factory_default"),
]

operations = [
migrations.CreateModel(
name="SeqvarsInhouseDbBuildBackgroundJob",
fields=[
(
"id",
models.AutoField(
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
),
),
("sodar_uuid", models.UUIDField(default=uuid.uuid4, unique=True)),
(
"bg_job",
models.ForeignKey(
help_text="Background job for state etc.",
on_delete=django.db.models.deletion.CASCADE,
related_name="seqvarsinhousedbbuildbackgroundjob",
to="bgjobs.backgroundjob",
),
),
],
options={
"ordering": ["-pk"],
},
bases=(bgjobs.models.JobModelMessageMixin, models.Model),
),
]
58 changes: 56 additions & 2 deletions backend/seqvars/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import uuid as uuid_object

from bgjobs.models import BackgroundJob, JobModelMessageMixin
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import models, transaction
from django_pydantic_field.v2.fields import PydanticSchemaField as SchemaField
Expand Down Expand Up @@ -2431,7 +2432,7 @@ class Meta:


class SeqvarsQueryExecutionBackgroundJobManager(models.Manager):
"""Custom manager class that allows to create a ``SeqvarsQueryExeuctionBackgroundJob``
"""Custom manager class that allows to create a ``SeqvarsQueryExecutionBackgroundJob``
together with the backing ``BackgroundJob``.
"""

Expand Down Expand Up @@ -2486,7 +2487,7 @@ class SeqvarsQueryExecutionBackgroundJob(JobModelMessageMixin, models.Model):
on_delete=models.CASCADE,
)

#: The case import action to perform.
#: The query execution to run for.
seqvarsqueryexecution = models.ForeignKey(
SeqvarsQueryExecution, on_delete=models.CASCADE, null=False
)
Expand All @@ -2496,3 +2497,56 @@ def get_human_readable_type(self):

class Meta:
ordering = ["-pk"]


class SeqvarsInhouseDbBuildBackgroundJobManager(models.Manager):
"""Custom manager class that allows to create a ``SeqvarsInhouseDbBuildBackgroundJob``
together with the backing ``BackgroundJob``.
"""

@transaction.atomic
def create_full(self):
bg_job = BackgroundJob.objects.create(
name="Building seqvars inhouse DB",
project=None,
job_type=SeqvarsInhouseDbBuildBackgroundJob.spec_name,
user=User.objects.get(username=settings.PROJECTROLES_ADMIN_OWNER),
)
instance = super().create(
bg_job=bg_job,
)
return instance


class SeqvarsInhouseDbBuildBackgroundJob(JobModelMessageMixin, models.Model):
"""Background job for re-building the seqvars inhouse DB."""

# We use a custom manager that provides creation together with the ``BackgroundJob``.
objects: SeqvarsInhouseDbBuildBackgroundJobManager = SeqvarsInhouseDbBuildBackgroundJobManager()

#: Task description for logging.
task_desc = "Seqvars Query Execution"

#: String identifying model in BackgroundJob.
spec_name = "seqvars.runinhousedbbuild"

#: The SODAR UUID.
sodar_uuid = models.UUIDField(
default=uuid_object.uuid4,
unique=True,
)

#: The background job for state management etc.
bg_job = models.ForeignKey(
BackgroundJob,
null=False,
related_name="seqvarsinhousedbbuildbackgroundjob",
help_text="Background job for state etc.",
on_delete=models.CASCADE,
)

def get_human_readable_type(self):
return self.task_desc

class Meta:
ordering = ["-pk"]
Comment on lines +2502 to +2552
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Multiple issues in SeqvarsInhouseDbBuildBackgroundJob implementation.

  1. The task description is incorrect (copied from query execution job)
  2. The create_full method implementation could be improved
  3. Missing documentation

Apply these fixes:

class SeqvarsInhouseDbBuildBackgroundJobManager(models.Manager):
    """Custom manager class that allows to create a ``SeqvarsInhouseDbBuildBackgroundJob``
    together with the backing ``BackgroundJob``.
    """

    @transaction.atomic
-   def create_full(self):
+   def create_full(self, *, user: typing.Optional[User] = None) -> "SeqvarsInhouseDbBuildBackgroundJob":
+       """Create a new background job for building the in-house database.
+
+       :param user: The user to associate with the job. If None, uses the admin user.
+       :return: The created background job instance.
+       """
+       if user is None:
+           user = User.objects.get(username=settings.PROJECTROLES_ADMIN_OWNER)
        bg_job = BackgroundJob.objects.create(
            name="Building seqvars inhouse DB",
            project=None,
            job_type=SeqvarsInhouseDbBuildBackgroundJob.spec_name,
-           user=User.objects.get(username=settings.PROJECTROLES_ADMIN_OWNER),
+           user=user,
        )
        instance = super().create(
            bg_job=bg_job,
        )
        return instance

class SeqvarsInhouseDbBuildBackgroundJob(JobModelMessageMixin, models.Model):
    """Background job for re-building the seqvars inhouse DB."""

    objects: SeqvarsInhouseDbBuildBackgroundJobManager = SeqvarsInhouseDbBuildBackgroundJobManager()

    #: Task description for logging.
-   task_desc = "Seqvars Query Execution"
+   task_desc = "Build Seqvars In-house Database"

The changes:

  1. Fix the task description to accurately reflect the job's purpose
  2. Make the create_full method more flexible by accepting an optional user parameter
  3. Add proper documentation to the create_full method
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class SeqvarsInhouseDbBuildBackgroundJobManager(models.Manager):
"""Custom manager class that allows to create a ``SeqvarsInhouseDbBuildBackgroundJob``
together with the backing ``BackgroundJob``.
"""
@transaction.atomic
def create_full(self):
bg_job = BackgroundJob.objects.create(
name="Building seqvars inhouse DB",
project=None,
job_type=SeqvarsInhouseDbBuildBackgroundJob.spec_name,
user=User.objects.get(username=settings.PROJECTROLES_ADMIN_OWNER),
)
instance = super().create(
bg_job=bg_job,
)
return instance
class SeqvarsInhouseDbBuildBackgroundJob(JobModelMessageMixin, models.Model):
"""Background job for re-building the seqvars inhouse DB."""
# We use a custom manager that provides creation together with the ``BackgroundJob``.
objects: SeqvarsInhouseDbBuildBackgroundJobManager = SeqvarsInhouseDbBuildBackgroundJobManager()
#: Task description for logging.
task_desc = "Seqvars Query Execution"
#: String identifying model in BackgroundJob.
spec_name = "seqvars.runinhousedbbuild"
#: The SODAR UUID.
sodar_uuid = models.UUIDField(
default=uuid_object.uuid4,
unique=True,
)
#: The background job for state management etc.
bg_job = models.ForeignKey(
BackgroundJob,
null=False,
related_name="seqvarsinhousedbbuildbackgroundjob",
help_text="Background job for state etc.",
on_delete=models.CASCADE,
)
def get_human_readable_type(self):
return self.task_desc
class Meta:
ordering = ["-pk"]
class SeqvarsInhouseDbBuildBackgroundJobManager(models.Manager):
"""Custom manager class that allows to create a ``SeqvarsInhouseDbBuildBackgroundJob``
together with the backing ``BackgroundJob``.
"""
@transaction.atomic
def create_full(self, *, user: typing.Optional[User] = None) -> "SeqvarsInhouseDbBuildBackgroundJob":
"""Create a new background job for building the in-house database.
:param user: The user to associate with the job. If None, uses the admin user.
:return: The created background job instance.
"""
if user is None:
user = User.objects.get(username=settings.PROJECTROLES_ADMIN_OWNER)
bg_job = BackgroundJob.objects.create(
name="Building seqvars inhouse DB",
project=None,
job_type=SeqvarsInhouseDbBuildBackgroundJob.spec_name,
user=user,
)
instance = super().create(
bg_job=bg_job,
)
return instance
class SeqvarsInhouseDbBuildBackgroundJob(JobModelMessageMixin, models.Model):
"""Background job for re-building the seqvars inhouse DB."""
# We use a custom manager that provides creation together with the ``BackgroundJob``.
objects: SeqvarsInhouseDbBuildBackgroundJobManager = SeqvarsInhouseDbBuildBackgroundJobManager()
#: Task description for logging.
task_desc = "Build Seqvars In-house Database"
#: String identifying model in BackgroundJob.
spec_name = "seqvars.runinhousedbbuild"
#: The SODAR UUID.
sodar_uuid = models.UUIDField(
default=uuid_object.uuid4,
unique=True,
)
#: The background job for state management etc.
bg_job = models.ForeignKey(
BackgroundJob,
null=False,
related_name="seqvarsinhousedbbuildbackgroundjob",
help_text="Background job for state etc.",
on_delete=models.CASCADE,
)
def get_human_readable_type(self):
return self.task_desc
class Meta:
ordering = ["-pk"]

164 changes: 144 additions & 20 deletions backend/seqvars/models/executors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import datetime
import os
import pathlib
import subprocess
import sys
import tempfile
Expand All @@ -8,12 +10,14 @@
from django.conf import settings
from django.utils import timezone
from google.protobuf.json_format import MessageToJson, Parse
from projectroles.templatetags.projectroles_common_tags import get_app_setting

from cases_files.models import PedigreeInternalFile
from cases_import.models.executors import FileSystemOptions, FileSystemWrapper, uuid_frag
from seqvars.models.base import (
DataSourceInfoPydantic,
DataSourceInfosPydantic,
SeqvarsInhouseDbBuildBackgroundJob,
SeqvarsQueryExecution,
SeqvarsQueryExecutionBackgroundJob,
SeqvarsResultRow,
Expand All @@ -25,10 +29,33 @@
seqvars_output_record_from_protobuf,
)
from seqvars.protos.output_pb2 import OutputHeader, OutputRecord
from variants.models.case import Case


class CaseImportBackgroundJobExecutor:
"""Implementation of ``CaseImportBackgroundJob`` execution."""
def aws_config_env_internal() -> dict[str, str]:
"""Build AWS config directory fragment for internal storage."""
return {
"AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key,
"AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key,
"AWS_ENDPOINT_URL": (
f"http://{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host}"
f":{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.port}"
),
"AWS_REGION": "us-east-1",
}


def run_worker(*, args: list[str], env: typing.Dict[str, str] | None = None):
"""Run the worker with the given arguments.

The worker will create a new VCF file and a TBI file.
"""
cmd = [settings.WORKER_EXE_PATH, *args]
subprocess.check_call(cmd, env=env)


class SeqvarsQueryExecutionBackgroundJobExecutor:
"""Implementation of ``SeqvarsQueryExecutionBackgroundJob`` execution."""

def __init__(self, job_pk: int):
#: Job record primary key.
Expand Down Expand Up @@ -128,37 +155,36 @@ def execute_query(self, tmpdir: str) -> str:
"--path-output",
f"{bucket}/{self.path_internal_results}",
]
# Expand with inhouse-database if existing.
worker_rw_path = pathlib.Path(settings.WORKER_DB_PATH)
path_inhouse = (
worker_rw_path
/ "worker"
/ "seqvars"
/ "inhouse"
/ vcf_genome_release
/ "active"
/ "rocksdb"
)
if path_inhouse.exists():
args.extend(["--path-inhouse-db", str(path_inhouse)])
# Setup environment so the worker can access the internal S3 storage.
env = {
**dict(os.environ.items()),
"LC_ALL": "C",
"AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key,
"AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key,
"AWS_ENDPOINT_URL": (
f"http://{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host}"
f":{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.port}"
),
"AWS_REGION": "us-east-1",
**aws_config_env_internal(),
}
Comment on lines +175 to 176
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid merging all environment variables into the subprocess

When constructing the env dictionary, merging the entire os.environ can introduce unintended environment variables into the subprocess, which might affect its behavior or security. It is safer to specify only the necessary environment variables explicitly.

Apply this diff to specify only the required environment variables:

-        env = {
-            **dict(os.environ.items()),
-            "LC_ALL": "C",
-            **aws_config_env_internal(),
-        }
+        env = {
+            "LC_ALL": "C",
+            **aws_config_env_internal(),
+        }

Committable suggestion was skipped due to low confidence.

# Actualy execute query execution with worker.
try:
self.run_worker(
run_worker(
args=args,
env=env,
)
except Exception:
pass
print("Error while executing worker / importing results", file=sys.stderr)
Comment on lines +179 to +184
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Exceptions in execute_query are not properly handled

In execute_query, if an exception occurs during run_worker, the exception is caught, and an error message is printed, but the exception is not re-raised or handled further. This may lead to the program continuing in an invalid state. Consider re-raising the exception to ensure that the calling method is aware of the failure and can handle it appropriately.

Apply this diff to re-raise the exception after logging:

            except Exception as e:
                print("Error while executing worker / importing results", file=sys.stderr)
+               raise e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
run_worker(
args=args,
env=env,
)
except Exception:
pass
print("Error while executing worker / importing results", file=sys.stderr)
run_worker(
args=args,
env=env,
)
except Exception as e:
print("Error while executing worker / importing results", file=sys.stderr)
raise e


return vcf_genome_release

def run_worker(self, *, args: list[str], env: typing.Dict[str, str] | None = None):
"""Run the worker with the given arguments.

The worker will create a new VCF file and a TBI file.
"""
cmd = [settings.WORKER_EXE_PATH, *args]
subprocess.check_call(cmd, env=env)

def load_results(self, *, genome_release: str):
"""Load the results from the internal storage and store in database."""
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
Expand Down Expand Up @@ -205,5 +231,103 @@ def load_results(self, *, genome_release: str):

def run_seqvarsqueryexecutionbackgroundjob(*, pk: int):
"""Execute the work for a ``SeqvarsQueryExecutionBackgroundJob``."""
executor = CaseImportBackgroundJobExecutor(pk)
executor = SeqvarsQueryExecutionBackgroundJobExecutor(pk)
executor.run()


class InhouseDbBuildBackgroundJobExecutor:
"""Implementation of ``SeqvarsInhouseDbBuildBackgroundJob`` execution."""

def __init__(self, job_pk: int):
#: Job record primary key.
self.job_pk = job_pk
#: The ``SeqvarsQueryExecutionBackgroundJob`` object itself.
self.bgjob: SeqvarsInhouseDbBuildBackgroundJob = (
SeqvarsInhouseDbBuildBackgroundJob.objects.get(pk=self.job_pk)
)

def run(self):
"""Execute building the inhouse database."""
self.run_for_genome_release(genome_release="grch37")
self.run_for_genome_release(genome_release="grch38")

def run_for_genome_release(self, *, genome_release: typing.Literal["grch37", "grch38"]):
"""Execute building the inhouse database for the given genome release."""
# Ensure the output path is present.
worker_rw_path = pathlib.Path(settings.WORKER_DB_PATH)
name = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_path = worker_rw_path / "worker" / "seqvars" / "inhouse" / genome_release / name
output_path.mkdir(parents=True, exist_ok=True)
# Prepare the file with paths in S3 for the worker.
num_cases = self.prepare_paths_for_genome_release(
genome_release=genome_release, output_path=output_path
)
if not num_cases:
print(f"No cases to process for {genome_release}, skipping inhouse database build.")
return
# Create arguments to use.
args = [
"seqvars",
"aggregate",
"--genomebuild",
genome_release,
"--path-out-rocksdb",
str(output_path / "rocksdb"),
"--path-input",
f"@{output_path / 'paths.txt'}",
]
# Setup environment so the worker can access the internal S3 storage.
env = {
**dict(os.environ.items()),
"LC_ALL": "C",
**aws_config_env_internal(),
}
# Actualy execute query execution with worker.
try:
print(" ".join(args))
run_worker(
Comment on lines +287 to +288
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid printing sensitive command-line arguments

Printing the command-line arguments with print(" ".join(args)) may inadvertently expose sensitive information in logs or console output. If the arguments contain confidential data, consider removing this print statement or sanitizing the output to prevent potential information leakage.

Apply this diff to remove the print statement:

-            print(" ".join(args))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print(" ".join(args))
run_worker(
run_worker(

args=args,
env=env,
)
except Exception:
print("Error while executing worker / importing results", file=sys.stderr)
return

# Atomically update the "active" symlink for the release using Unix `rename(2)`.
# This will not work on Windows.
print("Updating symlinks...")
output_path_with_suffix = output_path.with_suffix(".active")
print(f"ln -sr {output_path_with_suffix} {output_path}")
output_path_with_suffix.symlink_to(output_path.relative_to(output_path.parent))
print(f"rename {output_path_with_suffix} {output_path}")
output_path_with_suffix.rename(output_path.with_name("active"))

def prepare_paths_for_genome_release(
self, *, genome_release: typing.Literal["grch37", "grch38"], output_path: pathlib.Path
) -> int:
"""Prepare the paths file for the worker.

For this, we loop over all V2 cases that have the matching inhouse release.
"""
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
paths = []
for case in Case.objects.filter(case_version=2).prefetch_related("project").iterator():
if not get_app_setting("variants", "exclude_from_inhouse_db", project=case.project):
seqvars_file = PedigreeInternalFile.objects.filter(
case=case,
designation="variant_calls/seqvars/ingested-vcf",
)[0]
if seqvars_file.genomebuild == genome_release:
paths.append(f"{bucket}/{seqvars_file.path}")
Comment on lines +316 to +321
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential absence of PedigreeInternalFile objects

When accessing PedigreeInternalFile.objects.filter(...)[0], there's a risk of an IndexError if the queryset is empty. This can happen if no files match the filter criteria. To prevent the application from crashing, use .first() and check if the result is None before proceeding.

Apply this diff to safely handle cases with no matching files:

-                seqvars_file = PedigreeInternalFile.objects.filter(
-                    case=case,
-                    designation="variant_calls/seqvars/ingested-vcf",
-                )[0]
+                seqvars_file = PedigreeInternalFile.objects.filter(
+                    case=case,
+                    designation="variant_calls/seqvars/ingested-vcf",
+                ).first()
+                if not seqvars_file:
+                    continue
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
seqvars_file = PedigreeInternalFile.objects.filter(
case=case,
designation="variant_calls/seqvars/ingested-vcf",
)[0]
if seqvars_file.genomebuild == genome_release:
paths.append(f"{bucket}/{seqvars_file.path}")
seqvars_file = PedigreeInternalFile.objects.filter(
case=case,
designation="variant_calls/seqvars/ingested-vcf",
).first()
if not seqvars_file:
continue
if seqvars_file.genomebuild == genome_release:
paths.append(f"{bucket}/{seqvars_file.path}")

paths.sort()

with open(output_path / "paths.txt", "wt") as f:
f.write("\n".join(paths) + "\n")

return len(paths)


def run_seqvarsbuildinhousedbbackgroundjob(*, pk: int):
"""Execute the work for a ``SeqvarsInhouseDbBuildBackgroundJob``."""
executor = InhouseDbBuildBackgroundJobExecutor(pk)
executor.run()
Loading
Loading