Skip to content

Commit cd355cb

Browse files
authored
Merge pull request #786 from Bodom78/master
Fixes #743 for version 3.x
2 parents b1953fd + 99fad17 commit cd355cb

File tree

3 files changed

+119
-31
lines changed

3 files changed

+119
-31
lines changed

src/Jenssegers/Mongodb/Queue/MongoQueue.php

+60-29
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,75 @@
22

33
use Carbon\Carbon;
44
use Illuminate\Queue\DatabaseQueue;
5+
use Illuminate\Queue\Jobs\DatabaseJob;
6+
use MongoDB\Operation\FindOneAndUpdate;
7+
use DB;
58

69
class MongoQueue extends DatabaseQueue
710
{
811
/**
9-
* Get the next available job for the queue.
12+
* Pop the next job off of the queue.
13+
*
14+
* @param string $queue
15+
*
16+
* @return \Illuminate\Contracts\Queue\Job|null
17+
*/
18+
public function pop($queue = null)
19+
{
20+
$queue = $this->getQueue($queue);
21+
22+
if (!is_null($this->expire)) {
23+
$this->releaseJobsThatHaveBeenReservedTooLong($queue);
24+
}
25+
26+
if ($job = $this->getNextAvailableJobAndReserve($queue)) {
27+
return new DatabaseJob(
28+
$this->container, $this, $job, $queue
29+
);
30+
}
31+
}
32+
33+
/**
34+
* Get the next available job for the queue and mark it as reserved.
35+
*
36+
* When using multiple daemon queue listeners to process jobs there
37+
* is a possibility that multiple processes can end up reading the
38+
* same record before one has flagged it as reserved.
39+
*
40+
* This race condition can result in random jobs being run more then
41+
* once. To solve this we use findOneAndUpdate to lock the next jobs
42+
* record while flagging it as reserved at the same time.
43+
*
44+
* @param string|null $queue
1045
*
11-
* @param string|null $queue
1246
* @return \StdClass|null
1347
*/
14-
protected function getNextAvailableJob($queue)
48+
protected function getNextAvailableJobAndReserve($queue)
1549
{
16-
$job = $this->database->table($this->table)
17-
->lockForUpdate()
18-
->where('queue', $this->getQueue($queue))
19-
->where('reserved', 0)
20-
->where('available_at', '<=', $this->getTime())
21-
->orderBy('id', 'asc')
22-
->first();
50+
$job = DB::getCollection($this->table)->findOneAndUpdate(
51+
[
52+
'queue' => $this->getQueue($queue),
53+
'reserved' => 0,
54+
'available_at' => ['$lte' => $this->getTime()],
55+
56+
],
57+
[
58+
'$set' => [
59+
'reserved' => 1,
60+
'reserved_at' => $this->getTime(),
61+
],
62+
],
63+
[
64+
'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
65+
'sort' => ['available_at' => 1],
66+
]
67+
);
2368

2469
if ($job) {
25-
$job = (object) $job;
2670
$job->id = $job->_id;
2771
}
2872

29-
return $job ?: null;
73+
return $job;
3074
}
3175

3276
/**
@@ -40,16 +84,16 @@ protected function releaseJobsThatHaveBeenReservedTooLong($queue)
4084
$expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();
4185

4286
$reserved = $this->database->collection($this->table)
43-
->where('queue', $this->getQueue($queue))
44-
->where('reserved', 1)
45-
->where('reserved_at', '<=', $expired)->get();
87+
->where('queue', $this->getQueue($queue))
88+
->where('reserved', 1)
89+
->where('reserved_at', '<=', $expired)->get();
4690

4791
foreach ($reserved as $job) {
4892
$attempts = $job['attempts'] + 1;
4993
$this->releaseJob($job['_id'], $attempts);
5094
}
5195
}
52-
96+
5397
/**
5498
* Release the given job ID from reservation.
5599
*
@@ -66,19 +110,6 @@ protected function releaseJob($id, $attempts)
66110
]);
67111
}
68112

69-
/**
70-
* Mark the given job ID as reserved.
71-
*
72-
* @param string $id
73-
* @return void
74-
*/
75-
protected function markJobAsReserved($id)
76-
{
77-
$this->database->collection($this->table)->where('_id', $id)->update([
78-
'reserved' => 1, 'reserved_at' => $this->getTime(),
79-
]);
80-
}
81-
82113
/**
83114
* Delete a reserved job from the queue.
84115
*

tests/QueueTest.php

+36-2
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,46 @@
22

33
class QueueTest extends TestCase
44
{
5-
public function testQueue()
5+
public function setUp()
66
{
7-
$id = Queue::push('test', ['foo' => 'bar'], 'test');
7+
parent::setUp();
8+
9+
// Always start with a clean slate
10+
Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->truncate();
11+
Queue::getDatabase()->table(Config::get('queue.failed.table'))->truncate();
12+
}
13+
14+
public function testQueueJobLifeCycle()
15+
{
16+
$id = Queue::push('test', ['action' => 'QueueJobLifeCycle'], 'test');
817
$this->assertNotNull($id);
918

19+
// Get and reserve the test job (next available)
1020
$job = Queue::pop('test');
1121
$this->assertInstanceOf('Illuminate\Queue\Jobs\DatabaseJob', $job);
22+
$this->assertEquals(1, $job->getDatabaseJob()->reserved);
23+
$this->assertEquals(json_encode(['job' => 'test', 'data' => ['action' => 'QueueJobLifeCycle']]), $job->getRawBody());
24+
25+
// Remove reserved job
26+
$job->delete();
27+
$this->assertEquals(0, Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->count());
28+
}
29+
30+
public function testQueueJobExpired()
31+
{
32+
$id = Queue::push('test', ['action' => 'QueueJobExpired'], 'test');
33+
$this->assertNotNull($id);
34+
35+
// Expire the test job
36+
$expiry = \Carbon\Carbon::now()->subSeconds(Config::get('queue.connections.database.expire'))->getTimestamp();
37+
Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->where('_id', $id)->update(['reserved' => 1, 'reserved_at' => $expiry]);
38+
39+
// Expect an attempted older job in the queue
40+
$job = Queue::pop('test');
41+
$this->assertEquals(2, $job->getDatabaseJob()->attempts);
42+
$this->assertGreaterThan($expiry, $job->getDatabaseJob()->reserved_at);
43+
44+
$job->delete();
45+
$this->assertEquals(0, Queue::getDatabase()->table(Config::get('queue.connections.database.table'))->count());
1246
}
1347
}

tests/config/queue.php

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
return [
4+
5+
'default' => 'database',
6+
7+
'connections' => [
8+
9+
'database' => [
10+
'driver' => 'mongodb',
11+
'table' => 'jobs',
12+
'queue' => 'default',
13+
'expire' => 60,
14+
],
15+
16+
],
17+
18+
'failed' => [
19+
'database' => 'mongodb',
20+
'table' => 'failed_jobs',
21+
],
22+
23+
];

0 commit comments

Comments
 (0)