Skip to content

Commit 16fd35b

Browse files
authored
use Lua script to pop the task atomically (#51)
* use lua script to pop the task atomically * docs update * fix phpstan * update docs * cs fix
1 parent 847079f commit 16fd35b

10 files changed

+84
-49
lines changed

.php-cs-fixer.dist.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,5 @@
3737
return Factory::create(new CodeIgniter4(), $overrides, $options)->forLibrary(
3838
'CodeIgniter Queue',
3939
'CodeIgniter Foundation',
40-
40+
4141
);

docs/index.md

+5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ If you use `database` handler:
2626
- Oracle 12.1+
2727
- SQLite3
2828

29+
If you use `Redis` (you still need a relational database to store failed jobs):
30+
31+
- PHPRedis
32+
- Predis
33+
2934
### Table of Contents
3035

3136
* [Installation](installation.md)

docs/running-queues.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ This way, worker will consume jobs with the `low` priority and then with `high`.
6565

6666
### Running many instances of the same queue
6767

68-
As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases if you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver this setting is not relevant.
68+
As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases as long as you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver, this setting is not relevant as it provides atomicity without the need for explicit concurrency control.
69+
70+
The PHPRedis and Predis drivers are also safe to use with multiple instances of the same command.
6971

7072
### Handling long-running process
7173

72-
If we decide to run the long process e.g. with the command:
74+
If we decide to run the long process, e.g., with the command:
7375

7476
php spark queue:work emails -wait 10
7577

mkdocs.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ extra:
5353
site_url: https://queue.codeigniter.com/
5454
repo_url: https://github.com/codeigniter4/queue
5555
edit_uri: edit/develop/docs/
56-
copyright: Copyright © 2023 CodeIgniter Foundation.
56+
copyright: Copyright © 2025 CodeIgniter Foundation.
5757

5858
markdown_extensions:
5959
- admonition

src/Commands/QueueWork.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public function run(array $params)
108108
$memory,
109109
$priority,
110110
$tries,
111-
$retryAfter
111+
$retryAfter,
112112
] = $this->readOptions($params, $config, $queue);
113113

114114
if ($error !== null) {

src/Handlers/BaseHandler.php

+5-5
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,11 @@ public function retry(?int $id, ?string $queue): int
7272
$jobs = model(QueueJobFailedModel::class)
7373
->when(
7474
$id !== null,
75-
static fn ($query) => $query->where('id', $id)
75+
static fn ($query) => $query->where('id', $id),
7676
)
7777
->when(
7878
$queue !== null,
79-
static fn ($query) => $query->where('queue', $queue)
79+
static fn ($query) => $query->where('queue', $queue),
8080
)
8181
->findAll();
8282

@@ -112,11 +112,11 @@ public function flush(?int $hours, ?string $queue): bool
112112
return model(QueueJobFailedModel::class)
113113
->when(
114114
$hours !== null,
115-
static fn ($query) => $query->where('failed_at <=', Time::now()->subHours($hours)->timestamp)
115+
static fn ($query) => $query->where('failed_at <=', Time::now()->subHours($hours)->timestamp),
116116
)
117117
->when(
118118
$queue !== null,
119-
static fn ($query) => $query->where('queue', $queue)
119+
static fn ($query) => $query->where('queue', $queue),
120120
)
121121
->delete();
122122
}
@@ -129,7 +129,7 @@ public function listFailed(?string $queue): array
129129
return model(QueueJobFailedModel::class)
130130
->when(
131131
$queue !== null,
132-
static fn ($query) => $query->where('queue', $queue)
132+
static fn ($query) => $query->where('queue', $queue),
133133
)
134134
->orderBy('failed_at', 'desc')
135135
->findAll();

src/Handlers/PredisHandler.php

+23-21
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
namespace CodeIgniter\Queue\Handlers;
1515

16+
use CodeIgniter\Autoloader\FileLocator;
1617
use CodeIgniter\Exceptions\CriticalError;
1718
use CodeIgniter\I18n\Time;
1819
use CodeIgniter\Queue\Config\Queue as QueueConfig;
@@ -27,12 +28,20 @@
2728
class PredisHandler extends BaseHandler implements QueueInterface
2829
{
2930
private readonly Client $predis;
31+
private readonly string $luaScript;
3032

3133
public function __construct(protected QueueConfig $config)
3234
{
3335
try {
3436
$this->predis = new Client($config->predis, ['prefix' => $config->predis['prefix']]);
3537
$this->predis->time();
38+
39+
$locator = new FileLocator(service('autoloader'));
40+
$luaScript = $locator->locateFile('CodeIgniter\Queue\Lua\pop_task', null, 'lua');
41+
if ($luaScript === false) {
42+
throw new CriticalError('Queue: LUA script for Predis is not available.');
43+
}
44+
$this->luaScript = file_get_contents($luaScript);
3645
} catch (Exception $e) {
3746
throw new CriticalError('Queue: Predis connection refused (' . $e->getMessage() . ').');
3847
}
@@ -77,30 +86,23 @@ public function push(string $queue, string $job, array $data): bool
7786
*/
7887
public function pop(string $queue, array $priorities): ?QueueJob
7988
{
80-
$tasks = [];
81-
$now = Time::now()->timestamp;
82-
83-
foreach ($priorities as $priority) {
84-
$tasks = $this->predis->zrangebyscore(
85-
"queues:{$queue}:{$priority}",
86-
'-inf',
87-
$now,
88-
['LIMIT' => [0, 1]]
89-
);
90-
if ($tasks !== []) {
91-
$removed = $this->predis->zrem("queues:{$queue}:{$priority}", ...$tasks);
92-
if ($removed !== 0) {
93-
break;
94-
}
95-
$tasks = [];
96-
}
97-
}
89+
$now = (string) Time::now()->timestamp;
90+
91+
// Prepare the arguments for the Lua script
92+
$args = [
93+
'queues:' . $queue, // KEYS[1]
94+
$now, // ARGV[2]
95+
json_encode($priorities), // ARGV[3]
96+
];
97+
98+
// Execute the Lua script
99+
$task = $this->predis->eval($this->luaScript, 1, ...$args);
98100

99-
if ($tasks === []) {
101+
if ($task === null) {
100102
return null;
101103
}
102104

103-
$queueJob = new QueueJob(json_decode((string) $tasks[0], true));
105+
$queueJob = new QueueJob(json_decode((string) $task, true));
104106

105107
// Set the actual status as in DB.
106108
$queueJob->status = Status::RESERVED->value;
@@ -121,7 +123,7 @@ public function later(QueueJob $queueJob, int $seconds): bool
121123

122124
$result = $this->predis->zadd(
123125
"queues:{$queueJob->queue}:{$queueJob->priority}",
124-
[json_encode($queueJob) => $queueJob->available_at->timestamp]
126+
[json_encode($queueJob) => $queueJob->available_at->timestamp],
125127
);
126128
if ($result !== 0) {
127129
$this->predis->hdel("queues:{$queueJob->queue}::reserved", [$queueJob->id]);

src/Handlers/RedisHandler.php

+23-14
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
namespace CodeIgniter\Queue\Handlers;
1515

16+
use CodeIgniter\Autoloader\FileLocator;
1617
use CodeIgniter\Exceptions\CriticalError;
1718
use CodeIgniter\I18n\Time;
1819
use CodeIgniter\Queue\Config\Queue as QueueConfig;
@@ -27,6 +28,7 @@
2728
class RedisHandler extends BaseHandler implements QueueInterface
2829
{
2930
private readonly Redis $redis;
31+
private readonly string $luaScript;
3032

3133
public function __construct(protected QueueConfig $config)
3234
{
@@ -48,6 +50,13 @@ public function __construct(protected QueueConfig $config)
4850
if (isset($config->redis['prefix']) && ! $this->redis->setOption(Redis::OPT_PREFIX, $config->redis['prefix'])) {
4951
throw new CriticalError('Queue: Redis setting prefix failed.');
5052
}
53+
54+
$locator = new FileLocator(service('autoloader'));
55+
$luaScript = $locator->locateFile('CodeIgniter\Queue\Lua\pop_task', null, 'lua');
56+
if ($luaScript === false) {
57+
throw new CriticalError('Queue: LUA script for Redis is not available.');
58+
}
59+
$this->luaScript = file_get_contents($luaScript);
5160
} catch (RedisException $e) {
5261
throw new CriticalError('Queue: RedisException occurred with message (' . $e->getMessage() . ').');
5362
}
@@ -96,23 +105,23 @@ public function push(string $queue, string $job, array $data): bool
96105
*/
97106
public function pop(string $queue, array $priorities): ?QueueJob
98107
{
99-
$tasks = [];
100-
$now = Time::now()->timestamp;
101-
102-
foreach ($priorities as $priority) {
103-
if ($tasks = $this->redis->zRangeByScore("queues:{$queue}:{$priority}", '-inf', (string) $now, ['limit' => [0, 1]])) {
104-
if ($this->redis->zRem("queues:{$queue}:{$priority}", ...$tasks)) {
105-
break;
106-
}
107-
$tasks = [];
108-
}
109-
}
108+
$now = Time::now()->timestamp;
109+
110+
// Prepare the arguments for the Lua script
111+
$args = [
112+
'queues:' . $queue, // KEYS[1]
113+
$now, // ARGV[2]
114+
json_encode($priorities), // ARGV[3]
115+
];
116+
117+
// Execute the Lua script
118+
$task = $this->redis->eval($this->luaScript, $args, 1);
110119

111-
if ($tasks === []) {
120+
if ($task === false) {
112121
return null;
113122
}
114123

115-
$queueJob = new QueueJob(json_decode((string) $tasks[0], true));
124+
$queueJob = new QueueJob(json_decode((string) $task, true));
116125

117126
// Set the actual status as in DB.
118127
$queueJob->status = Status::RESERVED->value;
@@ -136,7 +145,7 @@ public function later(QueueJob $queueJob, int $seconds): bool
136145
$result = (int) $this->redis->zAdd(
137146
"queues:{$queueJob->queue}:{$queueJob->priority}",
138147
$queueJob->available_at->timestamp,
139-
json_encode($queueJob)
148+
json_encode($queueJob),
140149
);
141150
if ($result !== 0) {
142151
$this->redis->hDel("queues:{$queueJob->queue}::reserved", (string) $queueJob->id);

src/Lua/pop_task.lua

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
local queue = KEYS[1]
2+
local now = tonumber(ARGV[1])
3+
local priorities = cjson.decode(ARGV[2])
4+
local task = nil
5+
6+
for _, priority in ipairs(priorities) do
7+
local key = queue .. ':' .. priority
8+
local tasks = redis.call('ZRANGEBYSCORE', key, '-inf', tostring(now), 'LIMIT', 0, 1)
9+
10+
if #tasks > 0 then
11+
redis.call('ZREM', key, tasks[1])
12+
task = tasks[1]
13+
break
14+
end
15+
end
16+
17+
return task

src/Models/QueueJobModel.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -141,22 +141,22 @@ private function setPriority(BaseBuilder $builder, array $priority): BaseBuilder
141141
sprintf('CASE %s ', $this->db->protectIdentifiers('priority'))
142142
. implode(
143143
' ',
144-
array_map(static fn ($value, $key) => "WHEN '{$value}' THEN {$key}", $priority, array_keys($priority))
144+
array_map(static fn ($value, $key) => "WHEN '{$value}' THEN {$key}", $priority, array_keys($priority)),
145145
)
146146
. ' END',
147147
'',
148-
false
148+
false,
149149
);
150150
} else {
151151
$builder->orderBy(
152152
'FIELD(priority, '
153153
. implode(
154154
',',
155-
array_map(static fn ($value) => "'{$value}'", $priority)
155+
array_map(static fn ($value) => "'{$value}'", $priority),
156156
)
157157
. ')',
158158
'',
159-
false
159+
false,
160160
);
161161
}
162162
}

0 commit comments

Comments
 (0)