10
10
namespace WebSocket ;
11
11
12
12
use Closure ;
13
+ use Psr \Log \NullLogger ;
13
14
use Throwable ;
14
15
15
16
class Server extends Base
@@ -24,23 +25,31 @@ class Server extends Base
24
25
'timeout ' => null ,
25
26
];
26
27
27
- protected $ addr ;
28
28
protected $ port ;
29
29
protected $ listening ;
30
30
protected $ request ;
31
31
protected $ request_path ;
32
- private $ connectors = [];
32
+ private $ connections = [];
33
+ private $ listen = false ;
34
+
35
+
36
+ /* ---------- Construct & Destruct ----------------------------------------------- */
33
37
34
38
/**
35
39
* @param array $options
36
40
* Associative array containing:
37
- * - timeout : Set the socket timeout in seconds .
41
+ * - filter : Array of opcodes to handle. Default: ['text', 'binary'] .
38
42
* - fragment_size: Set framgemnt size. Default: 4096
39
- * - port: Chose port for listening. Default 8000.
43
+ * - logger: PSR-3 compatible logger. Default NullLogger.
44
+ * - port: Chose port for listening. Default 8000.
45
+ * - return_obj: If receive() function return Message instance. Default false.
46
+ * - timeout: Set the socket timeout in seconds.
40
47
*/
41
48
public function __construct (array $ options = [])
42
49
{
43
- $ this ->options = array_merge (self ::$ default_options , $ options );
50
+ $ this ->options = array_merge (self ::$ default_options , [
51
+ 'logger ' => new NullLogger (),
52
+ ], $ options );
44
53
$ this ->port = $ this ->options ['port ' ];
45
54
$ this ->setLogger ($ this ->options ['logger ' ]);
46
55
@@ -65,57 +74,40 @@ public function __construct(array $options = [])
65
74
$ this ->logger ->info ("Server listening to port {$ this ->port }" );
66
75
}
67
76
77
+ /**
78
+ * Disconnect streams on shutdown.
79
+ */
68
80
public function __destruct ()
69
81
{
82
+ /*
70
83
if ($this->connection && $this->connection->isConnected()) {
71
84
$this->connection->disconnect();
72
85
}
73
86
$this->connection = null;
74
- }
75
-
76
- public function getPort (): int
77
- {
78
- return $ this ->port ;
79
- }
80
-
81
- public function getPath (): string
82
- {
83
- return $ this ->request_path ;
84
- }
85
-
86
- public function getRequest (): array
87
- {
88
- return $ this ->request ;
89
- }
90
-
91
- public function getHeader ($ header ): ?string
92
- {
93
- foreach ($ this ->request as $ row ) {
94
- if (stripos ($ row , $ header ) !== false ) {
95
- list ($ headername , $ headervalue ) = explode (": " , $ row );
96
- return trim ($ headervalue );
87
+ */
88
+ foreach ($ this ->connections as $ connection ) {
89
+ if ($ connection ->isConnected ()) {
90
+ $ connection ->disconnect ();
97
91
}
98
92
}
99
- return null ;
93
+ $ this -> connections = [] ;
100
94
}
101
95
102
- public function accept (): bool
103
- {
104
- $ this ->connection = null ;
105
- return (bool )$ this ->listening ;
106
- }
96
+
97
+ /* ---------- Server operations -------------------------------------------------- */
107
98
108
99
/**
109
100
* Set server to listen to incoming requests.
110
101
* @param Closure A callback function that will be called when server receives message.
111
102
* function (Message $message, Connection $connection = null)
112
- * If callback function returns not null value, the listener will halt and return that value.
103
+ * If callback function returns non- null value, the listener will halt and return that value.
113
104
* Otherwise it will continue listening and propagating messages.
114
- * @return Returns any not null value returned by callback function.
105
+ * @return mixed Returns any non- null value returned by callback function.
115
106
*/
116
107
public function listen (Closure $ callback )
117
108
{
118
- while (true ) {
109
+ $ this ->listen = true ;
110
+ while ($ this ->listen ) {
119
111
// Server accept
120
112
if ($ stream = @stream_socket_accept ($ this ->listening , 0 )) {
121
113
$ peer = stream_socket_get_name ($ stream , true );
@@ -126,18 +118,18 @@ public function listen(Closure $callback)
126
118
$ connection ->setTimeout ($ this ->options ['timeout ' ]);
127
119
}
128
120
$ this ->performHandshake ($ connection );
129
- $ this ->connectors [$ peer ] = $ connection ;
121
+ $ this ->connections [$ peer ] = $ connection ;
130
122
}
131
123
132
124
// Collect streams to listen to
133
125
$ streams = array_filter (array_map (function ($ connection , $ peer ) {
134
126
$ stream = $ connection ->getStream ();
135
127
if (is_null ($ stream )) {
136
128
$ this ->logger ->debug ("[server] Remove {$ peer } from listener stack " );
137
- unset($ this ->connectors [$ peer ]);
129
+ unset($ this ->connections [$ peer ]);
138
130
}
139
131
return $ stream ;
140
- }, $ this ->connectors , array_keys ($ this ->connectors )));
132
+ }, $ this ->connections , array_keys ($ this ->connections )));
141
133
142
134
// Handle incoming
143
135
if (!empty ($ streams )) {
@@ -149,11 +141,15 @@ public function listen(Closure $callback)
149
141
try {
150
142
$ result = null ;
151
143
$ peer = stream_socket_get_name ($ stream , true );
152
- $ connection = $ this ->connectors [$ peer ];
144
+ if (empty ($ peer )) {
145
+ $ this ->logger ->warning ("[server] Got detached stream ' {$ peer }' " );
146
+ continue ;
147
+ }
148
+ $ connection = $ this ->connections [$ peer ];
153
149
$ this ->logger ->debug ("[server] Handling {$ peer }" );
154
150
$ message = $ connection ->pullMessage ();
155
151
if (!$ connection ->isConnected ()) {
156
- unset($ this ->connectors [$ peer ]);
152
+ unset($ this ->connections [$ peer ]);
157
153
$ connection = null ;
158
154
}
159
155
// Trigger callback according to filter
@@ -175,14 +171,102 @@ public function listen(Closure $callback)
175
171
}
176
172
}
177
173
174
+ /**
175
+ * Tell server to stop listening to incoming requests.
176
+ * Active connections are still available when restarting listening.
177
+ */
178
+ public function stop (): void
179
+ {
180
+ $ this ->listen = false ;
181
+ }
182
+
183
+ /**
184
+ * Accept an incoming request.
185
+ * Note that this operation will block accepting additional requests.
186
+ * @return bool True if listening
187
+ * @deprecated Will be removed in future version
188
+ */
189
+ public function accept (): bool
190
+ {
191
+ $ this ->connection = null ;
192
+ return (bool )$ this ->listening ;
193
+ }
194
+
195
+
196
+ /* ---------- Server option functions -------------------------------------------- */
197
+
198
+ /**
199
+ * Get current port.
200
+ * @return int port
201
+ */
202
+ public function getPort (): int
203
+ {
204
+ return $ this ->port ;
205
+ }
206
+
207
+ // Inherited from Base:
208
+ // - setLogger
209
+ // - setTimeout
210
+ // - setFragmentSize
211
+ // - getFragmentSize
212
+
213
+
214
+ /* ---------- Connection broadcast operations ------------------------------------ */
215
+
216
+ /**
217
+ * Close all connections.
218
+ * @param int Close status, default: 1000
219
+ * @param string Close message, default: 'ttfn'
220
+ */
178
221
public function close (int $ status = 1000 , string $ message = 'ttfn ' ): void
179
222
{
180
- parent ::close ($ status , $ message );
181
- foreach ($ this ->connectors as $ connection ) {
182
- $ connection ->close ($ status , $ message );
223
+ foreach ($ this ->connections as $ connection ) {
224
+ if ($ connection ->isConnected ()) {
225
+ $ connection ->close ($ status , $ message );
226
+ }
227
+ }
228
+ }
229
+
230
+ // Inherited from Base:
231
+ // - receive
232
+ // - send
233
+ // - text, binary, ping, pong
234
+
235
+
236
+ /* ---------- Connection functions (all deprecated) ------------------------------ */
237
+
238
+ public function getPath (): string
239
+ {
240
+ return $ this ->request_path ;
241
+ }
242
+
243
+ public function getRequest (): array
244
+ {
245
+ return $ this ->request ;
246
+ }
247
+
248
+ public function getHeader ($ header ): ?string
249
+ {
250
+ foreach ($ this ->request as $ row ) {
251
+ if (stripos ($ row , $ header ) !== false ) {
252
+ list ($ headername , $ headervalue ) = explode (": " , $ row );
253
+ return trim ($ headervalue );
254
+ }
183
255
}
256
+ return null ;
184
257
}
185
258
259
+ // Inherited from Base:
260
+ // - getLastOpcode
261
+ // - getCloseStatus
262
+ // - isConnected
263
+ // - disconnect
264
+ // - getName, getPeer, getPier
265
+
266
+
267
+ /* ---------- Helper functions --------------------------------------------------- */
268
+
269
+ // Connect when read/write operation is performed.
186
270
protected function connect (): void
187
271
{
188
272
$ error = null ;
@@ -215,8 +299,10 @@ protected function connect(): void
215
299
'pier ' => $ this ->connection ->getPeer (),
216
300
]);
217
301
$ this ->performHandshake ($ this ->connection );
302
+ $ this ->connections = ['* ' => $ this ->connection ];
218
303
}
219
304
305
+ // Perform upgrade handshake on new connections.
220
306
protected function performHandshake (Connection $ connection ): void
221
307
{
222
308
$ request = '' ;
0 commit comments