Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,21 @@
"title": "Default memory limit for Kubernetes jobs",
"value": "3Gi"
},
"kueue_available_queues": {
"title": "List of local queues available for job processing",
"value": [
"local-queue-1",
"local-queue-2"
]
},
"kueue_default_queue": {
"title": "Default queue to send workflow jobs to",
"value": "local-queue1"
},
"kueue_enabled": {
"title": "Whether Kueue is enabled for job processing",
"value": "False"
},
"maximum_kubernetes_jobs_timeout": {
"title": "Maximum timeout for Kubernetes jobs",
"value": "1209600"
Expand Down Expand Up @@ -750,6 +765,42 @@
},
"type": "object"
},
"kueue_available_queues": {
"properties": {
"title": {
"type": "string"
},
"value": {
"items": {
"type": "string"
},
"type": "array"
}
},
"type": "object"
},
"kueue_default_queue": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"kueue_enabled": {
"properties": {
"title": {
"type": "string"
},
"value": {
"type": "string"
}
},
"type": "object"
},
"maximum_interactive_session_inactivity_period": {
"properties": {
"title": {
Expand Down
11 changes: 11 additions & 0 deletions reana_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@
)
"""Maximum number of threads for one Dask worker."""

KUEUE_ENABLED = bool(strtobool(os.getenv("KUEUE_ENABLED", "False")))
"""Whether to use Kueue to manage job execution."""

KUEUE_AVAILABLE_QUEUES: list[dict] = (
json.loads(os.getenv("KUEUE_AVAILABLE_QUEUES", "[]")) or []
)
"""List of local queues available for workflow job processing."""

KUEUE_DEFAULT_QUEUE: str = os.getenv("KUEUE_DEFAULT_QUEUE", "")
"""Default queue to send workflow jobs to."""

REANA_KUBERNETES_JOBS_MEMORY_LIMIT = os.getenv("REANA_KUBERNETES_JOBS_MEMORY_LIMIT")
"""Maximum memory limit for user job containers for workflow complexity estimation."""

Expand Down
66 changes: 65 additions & 1 deletion reana_server/rest/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
REANA_DASK_CLUSTER_MAX_NUMBER_OF_WORKERS,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS,
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_THREADS,
KUEUE_ENABLED,
KUEUE_AVAILABLE_QUEUES,
KUEUE_DEFAULT_QUEUE,
)
from reana_server.decorators import signin_required

Expand Down Expand Up @@ -97,6 +100,29 @@ def info(user, **kwargs): # noqa
value:
type: string
type: object
kueue_enabled:
properties:
title:
type: string
value:
type: string
type: object
kueue_available_queues:
properties:
title:
type: string
value:
items:
type: string
type: array
type: object
kueue_default_queue:
properties:
title:
type: string
value:
type: string
type: object
kubernetes_max_memory_limit:
properties:
title:
Expand Down Expand Up @@ -291,6 +317,21 @@ def info(user, **kwargs): # noqa
"title": "Default memory limit for Kubernetes jobs",
"value": "3Gi"
},
"kueue_enabled": {
"title": "Whether Kueue is enabled for job processing",
"value": "False"
},
"kueue_available_queues": {
"title": "List of local queues available for job processing",
"value": [
"local-queue-1",
"local-queue-2"
]
},
"kueue_default_queue": {
"title": "Default queue to send workflow jobs to",
"value": "local-queue1"
},
"kubernetes_max_memory_limit": {
"title": "Maximum allowed memory limit for Kubernetes jobs",
"value": "10Gi"
Expand Down Expand Up @@ -419,6 +460,10 @@ def info(user, **kwargs): # noqa
title="Default memory limit for Kubernetes jobs",
value=REANA_KUBERNETES_JOBS_MEMORY_LIMIT,
),
kueue_enabled=dict(
title="Kueue enabled for job processing",
value=KUEUE_ENABLED,
),
kubernetes_max_memory_limit=dict(
title="Maximum allowed memory limit for Kubernetes jobs",
value=REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT,
Expand Down Expand Up @@ -479,6 +524,20 @@ def info(user, **kwargs): # noqa
),
)

if KUEUE_ENABLED:
cluster_information["kueue_available_queues"] = dict(
title="Local queues available for job processing",
value=[
f"{queue["node"]}-{queue["name"]}"
for queue in KUEUE_AVAILABLE_QUEUES
],
)

cluster_information["kueue_default_queue"] = dict(
title="Default queue to send workflow jobs to",
value=KUEUE_DEFAULT_QUEUE or "No default queue set",
)

if DASK_ENABLED:
cluster_information["dask_autoscaler_enabled"] = dict(
title="Dask autoscaler enabled in the cluster",
Expand Down Expand Up @@ -555,7 +614,6 @@ class InfoSchema(Schema):
maximum_interactive_session_inactivity_period = fields.Nested(
StringNullableInfoValue
)
kubernetes_max_memory_limit = fields.Nested(StringInfoValue)
interactive_session_recommended_jupyter_images = fields.Nested(ListStringInfoValue)
interactive_sessions_custom_image_allowed = fields.Nested(StringInfoValue)
supported_workflow_engines = fields.Nested(ListStringInfoValue)
Expand All @@ -566,6 +624,7 @@ class InfoSchema(Schema):
yadage_engine_packtivity_version = fields.Nested(StringInfoValue)
snakemake_engine_version = fields.Nested(StringInfoValue)
dask_enabled = fields.Nested(StringInfoValue)

if DASK_ENABLED:
dask_autoscaler_enabled = fields.Nested(StringInfoValue)
dask_cluster_default_number_of_workers = fields.Nested(StringInfoValue)
Expand All @@ -575,3 +634,8 @@ class InfoSchema(Schema):
dask_cluster_max_number_of_workers = fields.Nested(StringInfoValue)
dask_cluster_default_single_worker_threads = fields.Nested(StringInfoValue)
dask_cluster_max_single_worker_threads = fields.Nested(StringInfoValue)

kueue_enabled = fields.Nested(StringInfoValue)
if KUEUE_ENABLED:
kueue_available_queues = fields.Nested(ListStringInfoValue)
kueue_default_queue = fields.Nested(StringNullableInfoValue)
14 changes: 13 additions & 1 deletion reana_server/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@

import itertools
import pathlib
from typing import Dict, List
from typing import Dict

from reana_commons.config import WORKSPACE_PATHS
from reana_commons.errors import REANAValidationError
from reana_commons.validation.compute_backends import build_compute_backends_validator
from reana_commons.validation.kubernetes_queues import validate_kubernetes_queues
from reana_commons.validation.operational_options import validate_operational_options
from reana_commons.validation.parameters import build_parameters_validator
from reana_commons.validation.utils import validate_reana_yaml, validate_workspace
Expand All @@ -31,6 +32,9 @@
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS,
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_THREADS,
KUEUE_AVAILABLE_QUEUES,
KUEUE_ENABLED,
KUEUE_DEFAULT_QUEUE,
)
from reana_server import utils

Expand Down Expand Up @@ -137,6 +141,14 @@ def validate_workflow(reana_yaml: Dict, input_parameters: Dict) -> Dict:
validate_compute_backends(reana_yaml)
validate_workspace_path(reana_yaml)
validate_inputs(reana_yaml)
validate_kubernetes_queues(
reana_yaml,
KUEUE_ENABLED,
supported_queues=[
f"{queue['node']}-{queue['name']}" for queue in KUEUE_AVAILABLE_QUEUES
],
default_queue=KUEUE_DEFAULT_QUEUE,
)
return reana_yaml_warnings


Expand Down
Loading