Skip to content

Commit 1f184bb

Browse files
Fix: Unique lock not being released after transaction rollback in ShouldBeUnique jobs with afterCommit() (#55420)
* add a callback after transaction rollback * for unique jobs, release the UniqueLock when a transaction rollback * fix code style
1 parent bcb3f58 commit 1f184bb

File tree

7 files changed

+290
-0
lines changed

7 files changed

+290
-0
lines changed

src/Illuminate/Database/DatabaseTransactionRecord.php

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ class DatabaseTransactionRecord
3232
*/
3333
protected $callbacks = [];
3434

35+
/**
36+
* The callbacks that should be executed after rollback.
37+
*
38+
* @var array
39+
*/
40+
protected $callbacksForRollback = [];
41+
3542
/**
3643
* Create a new database transaction record instance.
3744
*
@@ -57,6 +64,17 @@ public function addCallback($callback)
5764
$this->callbacks[] = $callback;
5865
}
5966

67+
/**
68+
* Register a callback to be executed after rollback.
69+
*
70+
* @param callable $callback
71+
* @return void
72+
*/
73+
public function addCallbackForRollback($callback)
74+
{
75+
$this->callbacksForRollback[] = $callback;
76+
}
77+
6078
/**
6179
* Execute all of the callbacks.
6280
*
@@ -69,6 +87,18 @@ public function executeCallbacks()
6987
}
7088
}
7189

90+
/**
91+
* Execute all of the callbacks for rollback.
92+
*
93+
* @return void
94+
*/
95+
public function executeCallbacksForRollback()
96+
{
97+
foreach ($this->callbacksForRollback as $callback) {
98+
$callback();
99+
}
100+
}
101+
72102
/**
73103
* Get all of the callbacks.
74104
*
@@ -78,4 +108,14 @@ public function getCallbacks()
78108
{
79109
return $this->callbacks;
80110
}
111+
112+
/**
113+
* Get all of the callbacks for rollback.
114+
*
115+
* @return array
116+
*/
117+
public function getCallbacksForRollback()
118+
{
119+
return $this->callbacksForRollback;
120+
}
81121
}

src/Illuminate/Database/DatabaseTransactionsManager.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ public function rollback($connection, $newTransactionLevel)
139139
do {
140140
$this->removeCommittedTransactionsThatAreChildrenOf($this->currentTransaction[$connection]);
141141

142+
$this->currentTransaction[$connection]->executeCallbacksForRollback();
143+
142144
$this->currentTransaction[$connection] = $this->currentTransaction[$connection]->parent;
143145
} while (
144146
isset($this->currentTransaction[$connection]) &&
@@ -156,6 +158,12 @@ public function rollback($connection, $newTransactionLevel)
156158
*/
157159
protected function removeAllTransactionsForConnection($connection)
158160
{
161+
if ($this->currentTransaction) {
162+
for ($currentTransaction = $this->currentTransaction[$connection]; isset($currentTransaction); $currentTransaction = $currentTransaction->parent) {
163+
$currentTransaction->executeCallbacksForRollback();
164+
}
165+
}
166+
159167
$this->currentTransaction[$connection] = null;
160168

161169
$this->pendingTransactions = $this->pendingTransactions->reject(
@@ -203,6 +211,19 @@ public function addCallback($callback)
203211
$callback();
204212
}
205213

214+
/**
215+
* Register a callback for transaction rollback.
216+
*
217+
* @param callable $callback
218+
* @return void
219+
*/
220+
public function addCallbackForRollback($callback)
221+
{
222+
if ($current = $this->callbackApplicableTransactions()->last()) {
223+
return $current->addCallbackForRollback($callback);
224+
}
225+
}
226+
206227
/**
207228
* Get the transactions that are applicable to callbacks.
208229
*

src/Illuminate/Queue/Queue.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44

55
use Closure;
66
use DateTimeInterface;
7+
use Illuminate\Bus\UniqueLock;
78
use Illuminate\Container\Container;
9+
use Illuminate\Contracts\Cache\Repository as Cache;
810
use Illuminate\Contracts\Encryption\Encrypter;
911
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
12+
use Illuminate\Contracts\Queue\ShouldBeUnique;
1013
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
1114
use Illuminate\Queue\Events\JobQueued;
1215
use Illuminate\Queue\Events\JobQueueing;
@@ -324,6 +327,14 @@ protected function enqueueUsing($job, $payload, $queue, $delay, $callback)
324327
{
325328
if ($this->shouldDispatchAfterCommit($job) &&
326329
$this->container->bound('db.transactions')) {
330+
if ($job instanceof ShouldBeUnique) {
331+
$this->container->make('db.transactions')->addCallbackForRollback(
332+
function () use ($job) {
333+
(new UniqueLock($this->container->make(Cache::class)))->release($job);
334+
}
335+
);
336+
}
337+
327338
return $this->container->make('db.transactions')->addCallback(
328339
function () use ($queue, $job, $payload, $delay, $callback) {
329340
$this->raiseJobQueueingEvent($queue, $job, $payload, $delay);

src/Illuminate/Queue/SyncQueue.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22

33
namespace Illuminate\Queue;
44

5+
use Illuminate\Bus\UniqueLock;
6+
use Illuminate\Contracts\Cache\Repository as Cache;
57
use Illuminate\Contracts\Queue\Job;
68
use Illuminate\Contracts\Queue\Queue as QueueContract;
9+
use Illuminate\Contracts\Queue\ShouldBeUnique;
710
use Illuminate\Queue\Events\JobExceptionOccurred;
811
use Illuminate\Queue\Events\JobProcessed;
912
use Illuminate\Queue\Events\JobProcessing;
@@ -47,6 +50,14 @@ public function push($job, $data = '', $queue = null)
4750
{
4851
if ($this->shouldDispatchAfterCommit($job) &&
4952
$this->container->bound('db.transactions')) {
53+
if ($job instanceof ShouldBeUnique) {
54+
$this->container->make('db.transactions')->addCallbackForRollback(
55+
function () use ($job) {
56+
(new UniqueLock($this->container->make(Cache::class)))->release($job);
57+
}
58+
);
59+
}
60+
5061
return $this->container->make('db.transactions')->addCallback(
5162
fn () => $this->executeJob($job, $data, $queue)
5263
);

tests/Database/DatabaseTransactionsManagerTest.php

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,96 @@ public function testCallbackIsExecutedIfNoTransactions()
177177
$this->assertEquals(['default', 1], $callbacks[0]);
178178
}
179179

180+
public function testCallbacksForRollbackAreAddedToTheCurrentTransaction()
181+
{
182+
$callbacks = [];
183+
184+
$manager = (new DatabaseTransactionsManager);
185+
186+
$manager->begin('default', 1);
187+
188+
$manager->addCallbackForRollback(function () use (&$callbacks) {
189+
});
190+
191+
$manager->begin('default', 2);
192+
193+
$manager->begin('admin', 1);
194+
195+
$manager->addCallbackForRollback(function () use (&$callbacks) {
196+
});
197+
198+
$this->assertCount(1, $manager->getPendingTransactions()[0]->getCallbacksForRollback());
199+
$this->assertCount(0, $manager->getPendingTransactions()[1]->getCallbacksForRollback());
200+
$this->assertCount(1, $manager->getPendingTransactions()[2]->getCallbacksForRollback());
201+
}
202+
203+
public function testRollbackTransactionsExecutesCallbacks()
204+
{
205+
$callbacks = [];
206+
207+
$manager = (new DatabaseTransactionsManager);
208+
209+
$manager->begin('default', 1);
210+
211+
$manager->addCallbackForRollback(function () use (&$callbacks) {
212+
$callbacks[] = ['default', 1];
213+
});
214+
215+
$manager->begin('default', 2);
216+
217+
$manager->addCallbackForRollback(function () use (&$callbacks) {
218+
$callbacks[] = ['default', 2];
219+
});
220+
221+
$manager->begin('admin', 1);
222+
223+
$manager->rollback('default', 1);
224+
$manager->rollback('default', 0);
225+
226+
$this->assertCount(2, $callbacks);
227+
$this->assertEquals(['default', 2], $callbacks[0]);
228+
$this->assertEquals(['default', 1], $callbacks[1]);
229+
}
230+
231+
public function testRollbackExecutesOnlyCallbacksOfTheConnection()
232+
{
233+
$callbacks = [];
234+
235+
$manager = (new DatabaseTransactionsManager);
236+
237+
$manager->begin('default', 1);
238+
239+
$manager->addCallbackForRollback(function () use (&$callbacks) {
240+
$callbacks[] = ['default', 1];
241+
});
242+
243+
$manager->begin('default', 2);
244+
$manager->begin('admin', 1);
245+
246+
$manager->addCallbackForRollback(function () use (&$callbacks) {
247+
$callbacks[] = ['admin', 1];
248+
});
249+
250+
$manager->rollback('default', 1);
251+
$manager->rollback('default', 0);
252+
253+
$this->assertCount(1, $callbacks);
254+
$this->assertEquals(['default', 1], $callbacks[0]);
255+
}
256+
257+
public function testCallbackForRollbackIsNotExecutedIfNoTransactions()
258+
{
259+
$callbacks = [];
260+
261+
$manager = (new DatabaseTransactionsManager);
262+
263+
$manager->addCallbackForRollback(function () use (&$callbacks) {
264+
$callbacks[] = ['default', 1];
265+
});
266+
267+
$this->assertCount(0, $callbacks);
268+
}
269+
180270
public function testStageTransactions()
181271
{
182272
$manager = (new DatabaseTransactionsManager);

tests/Integration/Queue/QueueConnectionTest.php

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Illuminate\Tests\Integration\Queue;
44

55
use Illuminate\Bus\Queueable;
6+
use Illuminate\Contracts\Queue\ShouldBeUnique;
67
use Illuminate\Contracts\Queue\ShouldQueue;
78
use Illuminate\Database\DatabaseTransactionsManager;
89
use Illuminate\Foundation\Bus\Dispatchable;
@@ -19,6 +20,7 @@ class QueueConnectionTest extends TestCase
1920
protected function tearDown(): void
2021
{
2122
QueueConnectionTestJob::$ran = false;
23+
QueueConnectionTestUniqueJob::$ran = false;
2224

2325
parent::tearDown();
2426
}
@@ -28,6 +30,7 @@ public function testJobWontGetDispatchedInsideATransaction()
2830
$this->app->singleton('db.transactions', function () {
2931
$transactionManager = m::mock(DatabaseTransactionsManager::class);
3032
$transactionManager->shouldReceive('addCallback')->once()->andReturn(null);
33+
$transactionManager->shouldNotReceive('addCallbackForRollback');
3134

3235
return $transactionManager;
3336
});
@@ -40,6 +43,7 @@ public function testJobWillGetDispatchedInsideATransactionWhenExplicitlyIndicate
4043
$this->app->singleton('db.transactions', function () {
4144
$transactionManager = m::mock(DatabaseTransactionsManager::class);
4245
$transactionManager->shouldNotReceive('addCallback')->andReturn(null);
46+
$transactionManager->shouldNotReceive('addCallbackForRollback');
4347

4448
return $transactionManager;
4549
});
@@ -58,6 +62,7 @@ public function testJobWontGetDispatchedInsideATransactionWhenExplicitlyIndicate
5862
$this->app->singleton('db.transactions', function () {
5963
$transactionManager = m::mock(DatabaseTransactionsManager::class);
6064
$transactionManager->shouldReceive('addCallback')->once()->andReturn(null);
65+
$transactionManager->shouldNotReceive('addCallbackForRollback');
6166

6267
return $transactionManager;
6368
});
@@ -68,6 +73,55 @@ public function testJobWontGetDispatchedInsideATransactionWhenExplicitlyIndicate
6873
// This job was dispatched
6974
}
7075
}
76+
77+
public function testUniqueJobWontGetDispatchedInsideATransaction()
78+
{
79+
$this->app->singleton('db.transactions', function () {
80+
$transactionManager = m::mock(DatabaseTransactionsManager::class);
81+
$transactionManager->shouldReceive('addCallback')->once()->andReturn(null);
82+
$transactionManager->shouldReceive('addCallbackForRollback')->once()->andReturn(null);
83+
84+
return $transactionManager;
85+
});
86+
87+
Bus::dispatch(new QueueConnectionTestUniqueJob);
88+
}
89+
90+
public function testUniqueJobWillGetDispatchedInsideATransactionWhenExplicitlyIndicated()
91+
{
92+
$this->app->singleton('db.transactions', function () {
93+
$transactionManager = m::mock(DatabaseTransactionsManager::class);
94+
$transactionManager->shouldNotReceive('addCallback')->andReturn(null);
95+
$transactionManager->shouldNotReceive('addCallbackForRollback')->andReturn(null);
96+
97+
return $transactionManager;
98+
});
99+
100+
try {
101+
Bus::dispatch((new QueueConnectionTestUniqueJob)->beforeCommit());
102+
} catch (Throwable) {
103+
// This job was dispatched
104+
}
105+
}
106+
107+
public function testUniqueJobWontGetDispatchedInsideATransactionWhenExplicitlyIndicated()
108+
{
109+
$this->app['config']->set('queue.connections.sqs.after_commit', false);
110+
111+
$this->app->singleton('db.transactions', function () {
112+
$transactionManager = m::mock(DatabaseTransactionsManager::class);
113+
$transactionManager->shouldReceive('addCallback')->once()->andReturn(null);
114+
$transactionManager->shouldReceive('addCallbackForRollback')->once()->andReturn(null);
115+
116+
return $transactionManager;
117+
});
118+
119+
try {
120+
Bus::dispatch((new QueueConnectionTestUniqueJob)->afterCommit());
121+
} catch (SqsException) {
122+
// This job was dispatched
123+
}
124+
}
71125
}
72126

73127
class QueueConnectionTestJob implements ShouldQueue
@@ -81,3 +135,15 @@ public function handle()
81135
static::$ran = true;
82136
}
83137
}
138+
139+
class QueueConnectionTestUniqueJob implements ShouldQueue, ShouldBeUnique
140+
{
141+
use Dispatchable, Queueable;
142+
143+
public static $ran = false;
144+
145+
public function handle()
146+
{
147+
static::$ran = true;
148+
}
149+
}

0 commit comments

Comments
 (0)