4
4
5
5
use Interop \Queue \PsrConnectionFactory ;
6
6
use PhpAmqpLib \Connection \AbstractConnection ;
7
+ use PhpAmqpLib \Connection \AMQPLazyConnection ;
8
+ use PhpAmqpLib \Connection \AMQPLazySocketConnection ;
9
+ use PhpAmqpLib \Connection \AMQPSocketConnection ;
7
10
use PhpAmqpLib \Connection \AMQPStreamConnection ;
8
11
9
12
class AmqpConnectionFactory implements PsrConnectionFactory
@@ -19,10 +22,35 @@ class AmqpConnectionFactory implements PsrConnectionFactory
19
22
private $ connection ;
20
23
21
24
/**
22
- * @param array $config
25
+ * The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost with default credentials.
26
+ *
27
+ * [
28
+ * 'host' => 'amqp.host The host to connect too. Note: Max 1024 characters.',
29
+ * 'port' => 'amqp.port Port on the host.',
30
+ * 'vhost' => 'amqp.vhost The virtual host on the host. Note: Max 128 characters.',
31
+ * 'user' => 'amqp.user The user name to use. Note: Max 128 characters.',
32
+ * 'pass' => 'amqp.password Password. Note: Max 128 characters.',
33
+ * 'lazy' => 'the connection will be performed as later as possible, if the option set to true',
34
+ * 'stream' => 'stream or socket connection',
35
+ * ]
36
+ *
37
+ * or
38
+ *
39
+ * amqp://user:pass@host:10000/vhost?lazy=true&socket=true
40
+ *
41
+ * @param array|string $config
23
42
*/
24
- public function __construct (array $ config = [] )
43
+ public function __construct ($ config = ' amqp:// ' )
25
44
{
45
+ if (empty ($ config ) || 'amqp:// ' === $ config ) {
46
+ $ config = [];
47
+ } elseif (is_string ($ config )) {
48
+ $ config = $ this ->parseDsn ($ config );
49
+ } elseif (is_array ($ config )) {
50
+ } else {
51
+ throw new \LogicException ('The config must be either an array of options, a DSN string or null ' );
52
+ }
53
+
26
54
$ this ->config = array_replace ($ this ->defaultConfig (), $ config );
27
55
}
28
56
@@ -40,29 +68,152 @@ public function createContext()
40
68
private function establishConnection ()
41
69
{
42
70
if (false == $ this ->connection ) {
43
- $ this ->connection = new AMQPStreamConnection (
44
- $ this ->config ['host ' ],
45
- $ this ->config ['port ' ],
46
- $ this ->config ['user ' ],
47
- $ this ->config ['pass ' ],
48
- $ this ->config ['vhost ' ]
49
- );
71
+ if ($ this ->config ['stream ' ]) {
72
+ if ($ this ->config ['lazy ' ]) {
73
+ $ con = new AMQPLazyConnection (
74
+ $ this ->config ['host ' ],
75
+ $ this ->config ['port ' ],
76
+ $ this ->config ['user ' ],
77
+ $ this ->config ['pass ' ],
78
+ $ this ->config ['vhost ' ],
79
+ $ this ->config ['insist ' ],
80
+ $ this ->config ['login_method ' ],
81
+ $ this ->config ['login_response ' ],
82
+ $ this ->config ['locale ' ],
83
+ $ this ->config ['connection_timeout ' ],
84
+ $ this ->config ['read_write_timeout ' ],
85
+ null ,
86
+ $ this ->config ['keepalive ' ],
87
+ $ this ->config ['heartbeat ' ]
88
+ );
89
+ } else {
90
+ $ con = new AMQPStreamConnection (
91
+ $ this ->config ['host ' ],
92
+ $ this ->config ['port ' ],
93
+ $ this ->config ['user ' ],
94
+ $ this ->config ['pass ' ],
95
+ $ this ->config ['vhost ' ],
96
+ $ this ->config ['insist ' ],
97
+ $ this ->config ['login_method ' ],
98
+ $ this ->config ['login_response ' ],
99
+ $ this ->config ['locale ' ],
100
+ $ this ->config ['connection_timeout ' ],
101
+ $ this ->config ['read_write_timeout ' ],
102
+ null ,
103
+ $ this ->config ['keepalive ' ],
104
+ $ this ->config ['heartbeat ' ]
105
+ );
106
+ }
107
+ } else {
108
+ if ($ this ->config ['lazy ' ]) {
109
+ $ con = new AMQPLazySocketConnection (
110
+ $ this ->config ['host ' ],
111
+ $ this ->config ['port ' ],
112
+ $ this ->config ['user ' ],
113
+ $ this ->config ['pass ' ],
114
+ $ this ->config ['vhost ' ],
115
+ $ this ->config ['insist ' ],
116
+ $ this ->config ['login_method ' ],
117
+ $ this ->config ['login_response ' ],
118
+ $ this ->config ['locale ' ],
119
+ $ this ->config ['read_timeout ' ],
120
+ $ this ->config ['keepalive ' ],
121
+ $ this ->config ['write_timeout ' ],
122
+ $ this ->config ['heartbeat ' ]
123
+ );
124
+ } else {
125
+ $ con = new AMQPSocketConnection (
126
+ $ this ->config ['host ' ],
127
+ $ this ->config ['port ' ],
128
+ $ this ->config ['user ' ],
129
+ $ this ->config ['pass ' ],
130
+ $ this ->config ['vhost ' ],
131
+ $ this ->config ['insist ' ],
132
+ $ this ->config ['login_method ' ],
133
+ $ this ->config ['login_response ' ],
134
+ $ this ->config ['locale ' ],
135
+ $ this ->config ['read_timeout ' ],
136
+ $ this ->config ['keepalive ' ],
137
+ $ this ->config ['write_timeout ' ],
138
+ $ this ->config ['heartbeat ' ]
139
+ );
140
+ }
141
+ }
142
+
143
+ $ this ->connection = $ con ;
50
144
}
51
145
52
146
return $ this ->connection ;
53
147
}
54
148
149
+ /**
150
+ * @param string $dsn
151
+ *
152
+ * @return array
153
+ */
154
+ private function parseDsn ($ dsn )
155
+ {
156
+ $ dsnConfig = parse_url ($ dsn );
157
+ if (false === $ dsnConfig ) {
158
+ throw new \LogicException (sprintf ('Failed to parse DSN "%s" ' , $ dsn ));
159
+ }
160
+
161
+ $ dsnConfig = array_replace ([
162
+ 'scheme ' => null ,
163
+ 'host ' => null ,
164
+ 'port ' => null ,
165
+ 'user ' => null ,
166
+ 'pass ' => null ,
167
+ 'path ' => null ,
168
+ 'query ' => null ,
169
+ ], $ dsnConfig );
170
+
171
+ if ('amqp ' !== $ dsnConfig ['scheme ' ]) {
172
+ throw new \LogicException (sprintf ('The given DSN scheme "%s" is not supported. Could be "amqp" only. ' , $ dsnConfig ['scheme ' ]));
173
+ }
174
+
175
+ if ($ dsnConfig ['query ' ]) {
176
+ $ query = [];
177
+ parse_str ($ dsnConfig ['query ' ], $ query );
178
+
179
+ $ dsnConfig = array_replace ($ query , $ dsnConfig );
180
+ }
181
+
182
+ $ dsnConfig ['vhost ' ] = ltrim ($ dsnConfig ['path ' ], '/ ' );
183
+
184
+ unset($ dsnConfig ['scheme ' ], $ dsnConfig ['query ' ], $ dsnConfig ['fragment ' ], $ dsnConfig ['path ' ]);
185
+
186
+ $ config = array_replace ($ this ->defaultConfig (), $ dsnConfig );
187
+ $ config = array_map (function ($ value ) {
188
+ return urldecode ($ value );
189
+ }, $ config );
190
+
191
+ return $ config ;
192
+ }
193
+
55
194
/**
56
195
* @return array
57
196
*/
58
197
private function defaultConfig ()
59
198
{
60
199
return [
200
+ 'stream ' => true ,
201
+ 'lazy ' => true ,
61
202
'host ' => 'localhost ' ,
62
203
'port ' => 5672 ,
63
- 'vhost ' => '/ ' ,
64
204
'user ' => 'guest ' ,
65
205
'pass ' => 'guest ' ,
206
+ 'vhost ' => '/ ' ,
207
+ 'insist ' => false ,
208
+ 'login_method ' => 'AMQPLAIN ' ,
209
+ 'login_response ' => null ,
210
+ 'locale ' => 'en_US ' ,
211
+ 'read_timeout ' => 3 ,
212
+ 'keepalive ' => false ,
213
+ 'write_timeout ' => 3 ,
214
+ 'heartbeat ' => 0 ,
215
+ 'connection_timeout ' => 3.0 ,
216
+ 'read_write_timeout ' => 3.0 ,
66
217
];
67
218
}
68
219
}
0 commit comments