Skip to content

Commit df5e3a1

Browse files
committed
Fix queue support #623
1 parent 1d0f18c commit df5e3a1

File tree

5 files changed

+119
-0
lines changed

5 files changed

+119
-0
lines changed

src/Jenssegers/Mongodb/MongodbServiceProvider.php

+9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
<?php namespace Jenssegers\Mongodb;
22

33
use Illuminate\Support\ServiceProvider;
4+
use Jenssegers\Mongodb\Queue\MongoConnector;
45

56
class MongodbServiceProvider extends ServiceProvider
67
{
@@ -19,10 +20,18 @@ public function boot()
1920
*/
2021
public function register()
2122
{
23+
// Add database driver.
2224
$this->app->resolving('db', function ($db) {
2325
$db->extend('mongodb', function ($config) {
2426
return new Connection($config);
2527
});
2628
});
29+
30+
// Add connector for queue support.
31+
$this->app->resolving('queue', function ($queue) {
32+
$queue->addConnector('mongodb', function () {
33+
return new MongoConnector($this->app['db']);
34+
});
35+
});
2736
}
2837
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php namespace Jenssegers\Mongodb\Queue;
2+
3+
use Illuminate\Database\ConnectionResolverInterface;
4+
use Illuminate\Queue\Connectors\ConnectorInterface;
5+
use Illuminate\Support\Arr;
6+
7+
class MongoConnector implements ConnectorInterface
8+
{
9+
/**
10+
* Database connections.
11+
*
12+
* @var \Illuminate\Database\ConnectionResolverInterface
13+
*/
14+
protected $connections;
15+
16+
/**
17+
* Create a new connector instance.
18+
*
19+
* @param \Illuminate\Database\ConnectionResolverInterface $connections
20+
* @return void
21+
*/
22+
public function __construct(ConnectionResolverInterface $connections)
23+
{
24+
$this->connections = $connections;
25+
}
26+
27+
/**
28+
* Establish a queue connection.
29+
*
30+
* @param array $config
31+
* @return \Illuminate\Contracts\Queue\Queue
32+
*/
33+
public function connect(array $config)
34+
{
35+
return new MongoQueue(
36+
$this->connections->connection(Arr::get($config, 'connection')),
37+
$config['table'],
38+
$config['queue'],
39+
Arr::get($config, 'expire', 60)
40+
);
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
<?php namespace Jenssegers\Mongodb\Queue;
2+
3+
use Carbon\Carbon;
4+
use Illuminate\Queue\DatabaseQueue;
5+
6+
class MongoQueue extends DatabaseQueue
7+
{
8+
/**
9+
* Get the next available job for the queue.
10+
*
11+
* @param string|null $queue
12+
* @return \StdClass|null
13+
*/
14+
protected function getNextAvailableJob($queue)
15+
{
16+
$job = parent::getNextAvailableJob($queue);
17+
18+
if ($job) {
19+
$job->id = $job->_id;
20+
}
21+
22+
return $job;
23+
}
24+
25+
/**
26+
* Release the jobs that have been reserved for too long.
27+
*
28+
* @param string $queue
29+
* @return void
30+
*/
31+
protected function releaseJobsThatHaveBeenReservedTooLong($queue)
32+
{
33+
$expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();
34+
35+
$reserved = $this->database->collection($this->table)
36+
->where('queue', $this->getQueue($queue))
37+
->where('reserved', 1)
38+
->where('reserved_at', '<=', $expired)->get();
39+
40+
foreach ($reserved as $job) {
41+
$attempts = $job['attempts'] + 1;
42+
$this->releaseJob($job['_id'], $attempts);
43+
}
44+
}
45+
}

tests/QueueTest.php

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
class QueueTest extends TestCase
4+
{
5+
public function testQueue()
6+
{
7+
$id = Queue::push('test', ['foo' => 'bar'], 'test');
8+
$this->assertNotNull($id);
9+
10+
$job = Queue::pop('test');
11+
$this->assertInstanceOf('Illuminate\Queue\Jobs\DatabaseJob', $job);
12+
}
13+
}

tests/TestCase.php

+10
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,21 @@ protected function getEnvironmentSetUp($app)
2929

3030
$config = require 'config/database.php';
3131

32+
$app['config']->set('app.key', 'ZsZewWyUJ5FsKp9lMwv4tYbNlegQilM7');
33+
3234
$app['config']->set('database.default', 'mongodb');
3335
$app['config']->set('database.connections.mysql', $config['connections']['mysql']);
3436
$app['config']->set('database.connections.mongodb', $config['connections']['mongodb']);
3537

3638
$app['config']->set('auth.model', 'User');
3739
$app['config']->set('cache.driver', 'array');
40+
41+
$app['config']->set('queue.default', 'mongodb');
42+
$app['config']->set('queue.connections.mongodb', [
43+
'driver' => 'mongodb',
44+
'table' => 'jobs',
45+
'queue' => 'default',
46+
'expire' => 60,
47+
]);
3848
}
3949
}

0 commit comments

Comments
 (0)