@@ -33,17 +33,41 @@ class PubSubQueue extends Queue implements QueueContract
33
33
*/
34
34
protected $ subscriber ;
35
35
36
+ /**
37
+ * Create topics automatically.
38
+ *
39
+ * @var bool
40
+ */
41
+ protected $ topicAutoCreation ;
42
+
43
+ /**
44
+ * Create subscriptions automatically.
45
+ *
46
+ * @var bool
47
+ */
48
+ protected $ subscriptionAutoCreation ;
49
+
50
+ /**
51
+ * Prepend all queue names with this prefix.
52
+ *
53
+ * @var string
54
+ */
55
+ protected $ queuePrefix = '' ;
56
+
36
57
/**
37
58
* Create a new GCP PubSub instance.
38
59
*
39
60
* @param \Google\Cloud\PubSub\PubSubClient $pubsub
40
61
* @param string $default
41
62
*/
42
- public function __construct (PubSubClient $ pubsub , $ default , $ subscriber = 'subscriber ' )
63
+ public function __construct (PubSubClient $ pubsub , $ default , $ subscriber = 'subscriber ' , $ topicAutoCreation = true , $ subscriptionAutoCreation = true , $ queuePrefix = '' )
43
64
{
44
65
$ this ->pubsub = $ pubsub ;
45
66
$ this ->default = $ default ;
46
67
$ this ->subscriber = $ subscriber ;
68
+ $ this ->topicAutoCreation = $ topicAutoCreation ;
69
+ $ this ->subscriptionAutoCreation = $ subscriptionAutoCreation ;
70
+ $ this ->queuePrefix = $ queuePrefix ;
47
71
}
48
72
49
73
/**
@@ -85,7 +109,7 @@ public function push($job, $data = '', $queue = null)
85
109
*/
86
110
public function pushRaw ($ payload , $ queue = null , array $ options = [])
87
111
{
88
- $ topic = $ this ->getTopic ($ queue , true );
112
+ $ topic = $ this ->getTopic ($ queue , $ this -> topicAutoCreation );
89
113
90
114
$ this ->subscribeToTopic ($ topic );
91
115
@@ -131,7 +155,7 @@ public function pop($queue = null)
131
155
{
132
156
$ topic = $ this ->getTopic ($ this ->getQueue ($ queue ));
133
157
134
- if (! $ topic ->exists ()) {
158
+ if ($ this -> topicAutoCreation && ! $ topic ->exists ()) {
135
159
return ;
136
160
}
137
161
@@ -179,7 +203,7 @@ public function bulk($jobs, $data = '', $queue = null)
179
203
$ payloads [] = ['data ' => base64_encode ($ payload )];
180
204
}
181
205
182
- $ topic = $ this ->getTopic ($ this ->getQueue ($ queue ), true );
206
+ $ topic = $ this ->getTopic ($ this ->getQueue ($ queue ), $ this -> topicAutoCreation );
183
207
184
208
$ this ->subscribeToTopic ($ topic );
185
209
@@ -274,7 +298,8 @@ public function getTopic($queue, $create = false)
274
298
$ queue = $ this ->getQueue ($ queue );
275
299
$ topic = $ this ->pubsub ->topic ($ queue );
276
300
277
- if (! $ topic ->exists () && $ create ) {
301
+ // don't check topic if automatic creation is not required, to avoid additional administrator operations calls
302
+ if ($ create && ! $ topic ->exists ()) {
278
303
$ topic ->create ();
279
304
}
280
305
@@ -292,7 +317,8 @@ public function subscribeToTopic(Topic $topic)
292
317
{
293
318
$ subscription = $ topic ->subscription ($ this ->getSubscriberName ());
294
319
295
- if (! $ subscription ->exists ()) {
320
+ // don't check subscription if automatic creation is not required, to avoid additional administrator operations calls
321
+ if ($ this ->subscriptionAutoCreation && ! $ subscription ->exists ()) {
296
322
$ subscription = $ topic ->subscribe ($ this ->getSubscriberName ());
297
323
}
298
324
@@ -329,7 +355,13 @@ public function getPubSub()
329
355
*/
330
356
public function getQueue ($ queue )
331
357
{
332
- return $ queue ?: $ this ->default ;
358
+ $ queue = $ queue ?: $ this ->default ;
359
+
360
+ if (! $ this ->queuePrefix || Str::startsWith ($ queue , $ this ->queuePrefix )) {
361
+ return $ queue ;
362
+ }
363
+
364
+ return $ this ->queuePrefix .$ queue ;
333
365
}
334
366
335
367
/**
0 commit comments