Skip to content

Commit 68bfabb

Browse files
vadimonusVadim Dvorovenkotaylorotwell
authored
[11.x] Job Batches with Redis Cluster (#54522)
* Job Batches with Redis Cluster * Update RedisQueue.php --------- Co-authored-by: Vadim Dvorovenko <[email protected]> Co-authored-by: Taylor Otwell <[email protected]>
1 parent 814054e commit 68bfabb

File tree

2 files changed

+20
-17
lines changed

2 files changed

+20
-17
lines changed

src/Illuminate/Queue/RedisQueue.php

+20-10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
use Illuminate\Contracts\Queue\Queue as QueueContract;
77
use Illuminate\Contracts\Redis\Factory as Redis;
88
use Illuminate\Queue\Jobs\RedisJob;
9+
use Illuminate\Redis\Connections\PhpRedisClusterConnection;
10+
use Illuminate\Redis\Connections\PredisClusterConnection;
911
use Illuminate\Support\Str;
1012

1113
class RedisQueue extends Queue implements QueueContract, ClearableQueue
@@ -118,17 +120,25 @@ public function size($queue = null)
118120
*/
119121
public function bulk($jobs, $data = '', $queue = null)
120122
{
121-
$this->getConnection()->pipeline(function () use ($jobs, $data, $queue) {
122-
$this->getConnection()->transaction(function () use ($jobs, $data, $queue) {
123-
foreach ((array) $jobs as $job) {
124-
if (isset($job->delay)) {
125-
$this->later($job->delay, $job, $data, $queue);
126-
} else {
127-
$this->push($job, $data, $queue);
128-
}
123+
$connection = $this->getConnection();
124+
125+
$bulk = function () use ($jobs, $data, $queue) {
126+
foreach ((array) $jobs as $job) {
127+
if (isset($job->delay)) {
128+
$this->later($job->delay, $job, $data, $queue);
129+
} else {
130+
$this->push($job, $data, $queue);
129131
}
130-
});
131-
});
132+
}
133+
};
134+
135+
if ($connection instanceof PhpRedisClusterConnection) {
136+
$connection->transaction($bulk);
137+
} elseif ($connection instanceof PredisClusterConnection) {
138+
$connection->pipeline($bulk);
139+
} else {
140+
$connection->pipeline(fn () => $connection->transaction($bulk));
141+
}
132142
}
133143

134144
/**

tests/Integration/Queue/RedisQueueTest.php

-7
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
use Illuminate\Queue\Events\JobQueueing;
1010
use Illuminate\Queue\Jobs\RedisJob;
1111
use Illuminate\Queue\RedisQueue;
12-
use Illuminate\Redis\Connections\PhpRedisClusterConnection;
13-
use Illuminate\Redis\Connections\PredisClusterConnection;
1412
use Illuminate\Support\InteractsWithTime;
1513
use Illuminate\Support\Str;
1614
use Mockery as m;
@@ -517,11 +515,6 @@ public function testPushJobQueueingAndJobQueuedEvents($driver)
517515
#[DataProvider('redisDriverProvider')]
518516
public function testBulkJobQueuedEvent($driver)
519517
{
520-
if ($this->redis[$driver]->connection() instanceof PhpRedisClusterConnection
521-
|| $this->redis[$driver]->connection() instanceof PredisClusterConnection
522-
) {
523-
$this->markTestSkipped('RedisQueue::bulk currently does not support cluster connections');
524-
}
525518
$events = m::mock(Dispatcher::class);
526519
$events->shouldReceive('dispatch')->with(m::type(JobQueueing::class))->andReturnNull()->times(3);
527520
$events->shouldReceive('dispatch')->with(m::type(JobQueued::class))->andReturnNull()->times(3);

0 commit comments

Comments
 (0)