@@ -172,10 +172,7 @@ public function waitForResponses( ?int $timeoutMs = null ) : void
172
172
173
173
while ( $ this ->hasUnhandledResponses () )
174
174
{
175
- foreach ( $ this ->getSocketsHavingResponse () as $ socket )
176
- {
177
- $ this ->fetchResponseAndNotifyCallback ( $ socket , $ timeoutMs );
178
- }
175
+ $ this ->handleReadyResponses ( $ timeoutMs );
179
176
}
180
177
}
181
178
@@ -193,10 +190,12 @@ private function fetchResponseAndNotifyCallback( Socket $socket, ?int $timeoutMs
193
190
}
194
191
catch ( Throwable $ e )
195
192
{
196
- $ this ->sockets ->remove ( $ socket ->getId () );
197
-
198
193
$ socket ->notifyFailureCallbacks ( $ e );
199
194
}
195
+ finally
196
+ {
197
+ $ this ->sockets ->remove ( $ socket ->getId () );
198
+ }
200
199
}
201
200
202
201
/**
@@ -207,18 +206,6 @@ public function hasUnhandledResponses() : bool
207
206
return $ this ->sockets ->hasBusySockets ();
208
207
}
209
208
210
- /**
211
- * @return Generator|Socket[]
212
- * @throws ReadFailedException
213
- */
214
- private function getSocketsHavingResponse () : Generator
215
- {
216
- foreach ( $ this ->getRequestIdsHavingResponse () as $ requestId )
217
- {
218
- yield $ this ->sockets ->getById ( $ requestId );
219
- }
220
- }
221
-
222
209
/**
223
210
* @param int $requestId
224
211
*
@@ -244,7 +231,12 @@ public function getRequestIdsHavingResponse() : array
244
231
$ reads = $ this ->sockets ->collectResources ();
245
232
$ writes = $ excepts = null ;
246
233
247
- stream_select ( $ reads , $ writes , $ excepts , 0 , Socket::STREAM_SELECT_USEC );
234
+ $ result = @stream_select ( $ reads , $ writes , $ excepts , 0 , Socket::STREAM_SELECT_USEC );
235
+
236
+ if ( false === $ result || 0 === count ( $ reads ) )
237
+ {
238
+ return [];
239
+ }
248
240
249
241
return $ this ->sockets ->getSocketIdsByResources ( $ reads );
250
242
}
@@ -264,6 +256,10 @@ public function readResponses( ?int $timeoutMs = null, int ...$requestIds ) : Ge
264
256
yield $ this ->sockets ->getById ( $ requestId )->fetchResponse ( $ timeoutMs );
265
257
}
266
258
catch ( Throwable $ e )
259
+ {
260
+ # Skip unknown request ids
261
+ }
262
+ finally
267
263
{
268
264
$ this ->sockets ->remove ( $ requestId );
269
265
}
0 commit comments