forked from php-amqplib/RabbitMqBundle
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBaseAmqp.php
108 lines (93 loc) · 2.52 KB
/
BaseAmqp.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
<?php
namespace OldSound\RabbitMqBundle\RabbitMq;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPConnection;
abstract class BaseAmqp
{
protected $conn;
protected $ch;
protected $consumerTag;
protected $exchangeOptions = array(
'passive' => false,
'durable' => true,
'auto_delete' => false,
'internal' => false,
'nowait' => false,
'arguments' => null,
'ticket' => null
);
protected $queueOptions = array(
'name' => '',
'passive' => false,
'durable' => true,
'exclusive' => false,
'auto_delete' => false,
'nowait' => false,
'arguments' => null,
'ticket' => null
);
protected $routingKey = '';
/**
* @param AMQPConnection $conn
* @param AMQPChannel|null $ch
* @param null $consumerTag
*/
public function __construct(AMQPConnection $conn, AMQPChannel $ch = null, $consumerTag = null)
{
$this->conn = $conn;
$this->ch = empty($ch) ? $this->conn->channel() : $ch;
$this->consumerTag = empty($consumerTag) ? sprintf("PHPPROCESS_%s_%s", gethostname(), getmypid()) : $consumerTag;
}
public function __destruct()
{
//TODO FIX!
// if (!empty($this->ch) && !empty($this->conn))
// {
// $this->ch->close();
// }
//
// if (!empty($this->conn))
// {
// $this->conn->close();
// }
}
/**
* @param AMQPChannel $ch
* @return void
*/
public function setChannel(AMQPChannel $ch)
{
$this->ch = $ch;
}
/**
* @throws \InvalidArgumentException
* @param array $options
* @return void
*/
public function setExchangeOptions(array $options = array())
{
if (empty($options['name'])) {
throw new \InvalidArgumentException('You must provide an exchange name');
}
if (empty($options['type'])) {
throw new \InvalidArgumentException('You must provide an exchange type');
}
$this->exchangeOptions = array_merge($this->exchangeOptions, $options);
}
/**
* @param array $options
* @return void
*/
public function setQueueOptions(array $options = array())
{
$this->queueOptions = array_merge($this->queueOptions, $options);
}
/**
* @param string $routingKey
* @return void
*/
public function setRoutingKey($routingKey)
{
$this->routingKey = $routingKey;
}
}