Skip to content

Commit d425493

Browse files
committed
Fix queue support #623
1 parent b0354f2 commit d425493

File tree

5 files changed

+126
-1
lines changed

5 files changed

+126
-1
lines changed

src/Jenssegers/Mongodb/MongodbServiceProvider.php

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?php namespace Jenssegers\Mongodb;
22

33
use Illuminate\Support\ServiceProvider;
4-
use Jenssegers\Eloquent\Model;
4+
use Jenssegers\Mongodb\Queue\MongoConnector;
55

66
class MongodbServiceProvider extends ServiceProvider
77
{
@@ -20,10 +20,18 @@ public function boot()
2020
*/
2121
public function register()
2222
{
23+
// Add database driver.
2324
$this->app->resolving('db', function ($db) {
2425
$db->extend('mongodb', function ($config) {
2526
return new Connection($config);
2627
});
2728
});
29+
30+
// Add connector for queue support.
31+
$this->app->resolving('queue', function ($queue) {
32+
$queue->addConnector('mongodb', function () {
33+
return new MongoConnector($this->app['db']);
34+
});
35+
});
2836
}
2937
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php namespace Jenssegers\Mongodb\Queue;
2+
3+
use Illuminate\Database\ConnectionResolverInterface;
4+
use Illuminate\Queue\Connectors\ConnectorInterface;
5+
use Illuminate\Support\Arr;
6+
7+
class MongoConnector implements ConnectorInterface
8+
{
9+
/**
10+
* Database connections.
11+
*
12+
* @var \Illuminate\Database\ConnectionResolverInterface
13+
*/
14+
protected $connections;
15+
16+
/**
17+
* Create a new connector instance.
18+
*
19+
* @param \Illuminate\Database\ConnectionResolverInterface $connections
20+
* @return void
21+
*/
22+
public function __construct(ConnectionResolverInterface $connections)
23+
{
24+
$this->connections = $connections;
25+
}
26+
27+
/**
28+
* Establish a queue connection.
29+
*
30+
* @param array $config
31+
* @return \Illuminate\Contracts\Queue\Queue
32+
*/
33+
public function connect(array $config)
34+
{
35+
return new MongoQueue(
36+
$this->connections->connection(Arr::get($config, 'connection')),
37+
$config['table'],
38+
$config['queue'],
39+
Arr::get($config, 'expire', 60)
40+
);
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php namespace Jenssegers\Mongodb\Queue;
2+
3+
use Carbon\Carbon;
4+
use Illuminate\Queue\DatabaseQueue;
5+
6+
class MongoQueue extends DatabaseQueue
7+
{
8+
/**
9+
* Get the next available job for the queue.
10+
*
11+
* @param string|null $queue
12+
* @return \StdClass|null
13+
*/
14+
protected function getNextAvailableJob($queue)
15+
{
16+
$job = $this->database->table($this->table)
17+
->lockForUpdate()
18+
->where('queue', $this->getQueue($queue))
19+
->where('reserved', 0)
20+
->where('available_at', '<=', $this->getTime())
21+
->orderBy('id', 'asc')
22+
->first();
23+
24+
if ($job) {
25+
$job = (object) $job;
26+
$job->id = $job->_id;
27+
}
28+
29+
return $job ?: null;
30+
}
31+
32+
/**
33+
* Release the jobs that have been reserved for too long.
34+
*
35+
* @param string $queue
36+
* @return void
37+
*/
38+
protected function releaseJobsThatHaveBeenReservedTooLong($queue)
39+
{
40+
$expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();
41+
42+
$reserved = $this->database->collection($this->table)
43+
->where('queue', $this->getQueue($queue))
44+
->where('reserved', 1)
45+
->where('reserved_at', '<=', $expired)->get();
46+
47+
foreach ($reserved as $job) {
48+
$attempts = $job['attempts'] + 1;
49+
$this->releaseJob($job['_id'], $attempts);
50+
}
51+
}
52+
}

tests/QueueTest.php

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
class QueueTest extends TestCase
4+
{
5+
public function testQueue()
6+
{
7+
$id = Queue::push('test', ['foo' => 'bar'], 'test');
8+
$this->assertNotNull($id);
9+
10+
$job = Queue::pop('test');
11+
$this->assertInstanceOf('Illuminate\Queue\Jobs\DatabaseJob', $job);
12+
}
13+
}

tests/TestCase.php

+10
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,22 @@ protected function getEnvironmentSetUp($app)
2929

3030
$config = require 'config/database.php';
3131

32+
$app['config']->set('app.key', 'ZsZewWyUJ5FsKp9lMwv4tYbNlegQilM7');
33+
3234
$app['config']->set('database.default', 'mongodb');
3335
$app['config']->set('database.connections.mysql', $config['connections']['mysql']);
3436
$app['config']->set('database.connections.mongodb', $config['connections']['mongodb']);
3537

3638
$app['config']->set('auth.model', 'User');
3739
$app['config']->set('auth.providers.users.model', 'User');
3840
$app['config']->set('cache.driver', 'array');
41+
42+
$app['config']->set('queue.default', 'mongodb');
43+
$app['config']->set('queue.connections.mongodb', [
44+
'driver' => 'mongodb',
45+
'table' => 'jobs',
46+
'queue' => 'default',
47+
'expire' => 60,
48+
]);
3949
}
4050
}

0 commit comments

Comments
 (0)