Skip to content

Commit 5e6428c

Browse files
authored
Enhance broker support (#191)
1 parent 5bc8b05 commit 5e6428c

File tree

10 files changed

+132
-124
lines changed

10 files changed

+132
-124
lines changed

.github/workflows/test.yml

+4-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ jobs:
8282
run: |
8383
python -m pip --quiet install poetry
8484
echo "$HOME/.poetry/bin" >> $GITHUB_PATH
85-
poetry install -E yaml
85+
if [ ${{ matrix.broker == 'valkey' }} == true ]; then
86+
additional_args="-E valkey"
87+
fi
88+
poetry install -E yaml $additional_args
8689
poetry run pip install django==${{ matrix.django-version }}
8790
8891
- name: Get version

poetry.lock

+74-73
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ croniter = ">=2.0"
4747
click = "^8.1"
4848
rq = "^1.16"
4949
pyyaml = { version = "^6.0", optional = true }
50-
valkey = "6.0.1"
50+
valkey = { version = "^6.0.2", optional = true}
5151

5252
[tool.poetry.dev-dependencies]
5353
poetry = "^1.8.3"
@@ -60,6 +60,7 @@ freezegun = "^1.5"
6060

6161
[tool.poetry.extras]
6262
yaml = ["pyyaml"]
63+
valkey = ["valkey"]
6364

6465
[tool.flake8]
6566
max-line-length = 120

scheduler/admin/task_models.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
import redis
2-
import valkey
31
from django.contrib import admin, messages
42
from django.contrib.contenttypes.admin import GenericStackedInline
53
from django.utils.translation import gettext_lazy as _
64

75
from scheduler import tools
6+
from scheduler.broker_types import ConnectionErrorTypes
87
from scheduler.models import CronTask, TaskArg, TaskKwarg, RepeatableTask, ScheduledTask
98
from scheduler.settings import SCHEDULER_CONFIG, logger
109
from scheduler.tools import get_job_executions_for_task
@@ -186,7 +185,7 @@ def change_view(self, request, object_id, form_url="", extra_context=None):
186185
obj = self.get_object(request, object_id)
187186
try:
188187
execution_list = get_job_executions_for_task(obj.queue, obj)
189-
except (redis.ConnectionError, valkey.ConnectionError) as e:
188+
except ConnectionErrorTypes as e:
190189
logger.warn(f"Could not get job executions: {e}")
191190
execution_list = list()
192191
paginator = self.get_paginator(request, execution_list, SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE)

scheduler/broker_types.py

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from typing import Union, Dict, Tuple, Type
2+
3+
import redis
4+
5+
try:
6+
import valkey
7+
except ImportError:
8+
valkey = redis
9+
valkey.Valkey = redis.Redis
10+
valkey.StrictValkey = redis.StrictRedis
11+
12+
from scheduler.settings import Broker
13+
14+
ConnectionErrorTypes = (redis.ConnectionError, valkey.ConnectionError)
15+
ResponseErrorTypes = (redis.ResponseError, valkey.ResponseError)
16+
ConnectionType = Union[redis.Redis, valkey.Valkey]
17+
PipelineType = Union[redis.client.Pipeline, valkey.client.Pipeline]
18+
SentinelType = Union[redis.sentinel.Sentinel, valkey.sentinel.Sentinel]
19+
20+
BrokerMetaData: Dict[Tuple[Broker, bool], Tuple[Type[ConnectionType], Type[SentinelType], str]] = {
21+
# Map of (Broker, Strict flag) => Connection Class, Sentinel Class, SSL Connection Prefix
22+
(Broker.REDIS, False): (redis.Redis, redis.sentinel.Sentinel, "rediss"),
23+
(Broker.VALKEY, False): (valkey.Valkey, valkey.sentinel.Sentinel, "valkeys"),
24+
(Broker.REDIS, True): (redis.StrictRedis, redis.sentinel.Sentinel, "rediss"),
25+
(Broker.VALKEY, True): (valkey.StrictValkey, valkey.sentinel.Sentinel, "valkeys"),
26+
}

scheduler/connection_types.py

-19
This file was deleted.

scheduler/management/commands/rqworker.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@
33
import sys
44

55
import click
6-
import redis
7-
import valkey
86
from django.core.management.base import BaseCommand
97
from django.db import connections
108
from rq.logutils import setup_loghandlers
119

10+
from scheduler.broker_types import ConnectionErrorTypes
1211
from scheduler.rq_classes import register_sentry
1312
from scheduler.tools import create_worker
1413

@@ -136,6 +135,6 @@ def handle(self, **options):
136135
logging_level=log_level,
137136
max_jobs=options["max_jobs"],
138137
)
139-
except (redis.ConnectionError, valkey.ConnectionError) as e:
138+
except ConnectionErrorTypes as e:
140139
click.echo(str(e), err=True)
141140
sys.exit(1)

scheduler/queues.py

+17-18
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
from typing import List, Dict, Set
22

3-
import redis
4-
import valkey
5-
6-
from .connection_types import RedisSentinel, BrokerConnectionClass
3+
from .broker_types import ConnectionErrorTypes, BrokerMetaData
74
from .rq_classes import JobExecution, DjangoQueue, DjangoWorker
85
from .settings import SCHEDULER_CONFIG
96
from .settings import logger, Broker
@@ -28,31 +25,32 @@ class QueueNotFoundError(Exception):
2825
pass
2926

3027

31-
def _get_redis_connection(config, use_strict_redis=False):
28+
def _get_broker_connection(config, use_strict_broker=False):
3229
"""
3330
Returns a redis connection from a connection config
3431
"""
3532
if SCHEDULER_CONFIG.BROKER == Broker.FAKEREDIS:
3633
import fakeredis
3734

38-
redis_cls = fakeredis.FakeRedis if use_strict_redis else fakeredis.FakeStrictRedis
35+
broker_cls = fakeredis.FakeRedis if not use_strict_broker else fakeredis.FakeStrictRedis
3936
else:
40-
redis_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_redis)]
37+
broker_cls = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][0]
4138
logger.debug(f"Getting connection for {config}")
4239
if "URL" in config:
43-
if config.get("SSL") or config.get("URL").startswith("rediss://"):
44-
return redis_cls.from_url(
40+
ssl_url_protocol = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][2]
41+
if config.get("SSL") or config.get("URL").startswith(f"{ssl_url_protocol}://"):
42+
return broker_cls.from_url(
4543
config["URL"],
4644
db=config.get("DB"),
4745
ssl_cert_reqs=config.get("SSL_CERT_REQS", "required"),
4846
)
4947
else:
50-
return redis_cls.from_url(
48+
return broker_cls.from_url(
5149
config["URL"],
5250
db=config.get("DB"),
5351
)
5452
if "UNIX_SOCKET_PATH" in config:
55-
return redis_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])
53+
return broker_cls(unix_socket_path=config["UNIX_SOCKET_PATH"], db=config["DB"])
5654

5755
if "SENTINELS" in config:
5856
connection_kwargs = {
@@ -63,13 +61,14 @@ def _get_redis_connection(config, use_strict_redis=False):
6361
}
6462
connection_kwargs.update(config.get("CONNECTION_KWARGS", {}))
6563
sentinel_kwargs = config.get("SENTINEL_KWARGS", {})
66-
sentinel = RedisSentinel(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
64+
SentinelClass = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)][1]
65+
sentinel = SentinelClass(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
6766
return sentinel.master_for(
6867
service_name=config["MASTER_NAME"],
69-
redis_class=redis_cls,
68+
redis_class=broker_cls,
7069
)
7170

72-
return redis_cls(
71+
return broker_cls(
7372
host=config["HOST"],
7473
port=config["PORT"],
7574
db=config.get("DB", 0),
@@ -82,8 +81,8 @@ def _get_redis_connection(config, use_strict_redis=False):
8281

8382

8483
def get_connection(queue_settings, use_strict_redis=False):
85-
"""Returns a Redis connection to use based on parameters in SCHEDULER_QUEUES"""
86-
return _get_redis_connection(queue_settings, use_strict_redis)
84+
"""Returns a Broker connection to use based on parameters in SCHEDULER_QUEUES"""
85+
return _get_broker_connection(queue_settings, use_strict_redis)
8786

8887

8988
def get_queue(
@@ -116,7 +115,7 @@ def get_all_workers() -> Set[DjangoWorker]:
116115
try:
117116
curr_workers: Set[DjangoWorker] = set(DjangoWorker.all(connection=connection))
118117
workers_set.update(curr_workers)
119-
except (redis.ConnectionError, valkey.ConnectionError) as e:
118+
except ConnectionErrorTypes as e:
120119
logger.error(f"Could not connect for queue {queue_name}: {e}")
121120
return workers_set
122121

@@ -142,7 +141,7 @@ def get_queues(*queue_names, **kwargs) -> List[DjangoQueue]:
142141
for name in queue_names[1:]:
143142
if not _queues_share_connection_params(queue_params, QUEUES[name]):
144143
raise ValueError(
145-
f'Queues must have the same redis connection. "{name}" and'
144+
f'Queues must have the same broker connection. "{name}" and'
146145
f' "{queue_names[0]}" have different connections'
147146
)
148147
queue = get_queue(name, **kwargs)

scheduler/rq_classes.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from rq.worker import WorkerStatus
2323

2424
from scheduler import settings
25-
from scheduler.connection_types import PipelineType, ConnectionType
25+
from scheduler.broker_types import PipelineType, ConnectionType
2626

2727
MODEL_NAMES = ["ScheduledTask", "RepeatableTask", "CronTask"]
2828

scheduler/views.py

+4-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from math import ceil
33
from typing import Tuple, Optional
44

5-
import redis
65
from django.contrib import admin, messages
76
from django.contrib.admin.views.decorators import staff_member_required
87
from django.core.paginator import Paginator
@@ -12,8 +11,8 @@
1211
from django.shortcuts import render
1312
from django.urls import reverse, resolve
1413
from django.views.decorators.cache import never_cache
15-
from redis.exceptions import ResponseError
1614

15+
from .broker_types import ConnectionErrorTypes, ResponseErrorTypes
1716
from .queues import get_all_workers, get_connection, QueueNotFoundError
1817
from .queues import get_queue as get_queue_base
1918
from .rq_classes import JobExecution, DjangoWorker, DjangoQueue, InvalidJobOperation
@@ -71,7 +70,7 @@ def get_statistics(run_maintenance_tasks=False):
7170
if run_maintenance_tasks:
7271
queue.clean_registries()
7372

74-
# Raw access to the first item from left of the redis list.
73+
# Raw access to the first item from left of the broker list.
7574
# This might not be accurate since new job can be added from the left
7675
# with `at_front` parameters.
7776
# Ideally rq should supports Queue.oldest_job
@@ -102,7 +101,7 @@ def get_statistics(run_maintenance_tasks=False):
102101
canceled_jobs=len(queue.canceled_job_registry),
103102
)
104103
queues.append(queue_data)
105-
except redis.ConnectionError as e:
104+
except ConnectionErrorTypes as e:
106105
logger.error(f"Could not connect for queue {queue_name}: {e}")
107106
continue
108107

@@ -277,7 +276,7 @@ def clear_queue_registry(request: HttpRequest, queue_name: str, registry_name: s
277276
for job_id in job_ids:
278277
registry.remove(job_id, delete_job=True)
279278
messages.info(request, f"You have successfully cleared the {registry_name} jobs in queue {queue.name}")
280-
except ResponseError as e:
279+
except ResponseErrorTypes as e:
281280
messages.error(
282281
request,
283282
f"error: {e}",

0 commit comments

Comments
 (0)