@@ -246,21 +246,24 @@ def test_recv_streaming_connection_closed_ok(self):
246
246
"""recv_streaming raises ConnectionClosedOK after a normal closure."""
247
247
self .remote_connection .close ()
248
248
with self .assertRaises (ConnectionClosedOK ):
249
- list (self .connection .recv_streaming ())
249
+ for _ in self .connection .recv_streaming ():
250
+ self .fail ("did not raise" )
250
251
251
252
def test_recv_streaming_connection_closed_error (self ):
252
253
"""recv_streaming raises ConnectionClosedError after an error."""
253
254
self .remote_connection .close (code = CloseCode .INTERNAL_ERROR )
254
255
with self .assertRaises (ConnectionClosedError ):
255
- list (self .connection .recv_streaming ())
256
+ for _ in self .connection .recv_streaming ():
257
+ self .fail ("did not raise" )
256
258
257
259
def test_recv_streaming_during_recv (self ):
258
260
"""recv_streaming raises RuntimeError when called concurrently with recv."""
259
261
recv_thread = threading .Thread (target = self .connection .recv )
260
262
recv_thread .start ()
261
263
262
264
with self .assertRaises (RuntimeError ) as raised :
263
- list (self .connection .recv_streaming ())
265
+ for _ in self .connection .recv_streaming ():
266
+ self .fail ("did not raise" )
264
267
self .assertEqual (
265
268
str (raised .exception ),
266
269
"cannot call recv_streaming while another thread "
@@ -278,7 +281,8 @@ def test_recv_streaming_during_recv_streaming(self):
278
281
recv_streaming_thread .start ()
279
282
280
283
with self .assertRaises (RuntimeError ) as raised :
281
- list (self .connection .recv_streaming ())
284
+ for _ in self .connection .recv_streaming ():
285
+ self .fail ("did not raise" )
282
286
self .assertEqual (
283
287
str (raised .exception ),
284
288
r"cannot call recv_streaming while another thread "
@@ -374,7 +378,7 @@ def test_send_empty_iterable(self):
374
378
"""send does nothing when called with an empty iterable."""
375
379
self .connection .send ([])
376
380
self .connection .close ()
377
- self .assertEqual (list (iter ( self .remote_connection ) ), [])
381
+ self .assertEqual (list (self .remote_connection ), [])
378
382
379
383
def test_send_mixed_iterable (self ):
380
384
"""send raises TypeError when called with an iterable of inconsistent types."""
@@ -437,7 +441,7 @@ def test_close_waits_for_connection_closed(self):
437
441
438
442
def test_close_timeout_waiting_for_close_frame (self ):
439
443
"""close times out if no close frame is received."""
440
- with self .drop_eof_rcvd (), self .drop_frames_rcvd ():
444
+ with self .drop_frames_rcvd (), self .drop_eof_rcvd ():
441
445
self .connection .close ()
442
446
443
447
with self .assertRaises (ConnectionClosedError ) as raised :
@@ -464,6 +468,10 @@ def test_close_timeout_waiting_for_connection_closed(self):
464
468
self .assertIsInstance (exc .__cause__ , (socket .timeout , TimeoutError ))
465
469
466
470
def test_close_waits_for_recv (self ):
471
+ # The sync implementation doesn't have a buffer for incoming messsages.
472
+ # It requires reading incoming frames until the close frame is reached.
473
+ # This behavior — close() blocks until recv() is called — is less than
474
+ # ideal and inconsistent with the asyncio implementation.
467
475
self .remote_connection .send ("😀" )
468
476
469
477
close_thread = threading .Thread (target = self .connection .close )
@@ -547,6 +555,25 @@ def closer():
547
555
548
556
close_thread .join ()
549
557
558
+ def test_close_during_recv (self ):
559
+ """close aborts recv when called concurrently with recv."""
560
+
561
+ def closer ():
562
+ time .sleep (MS )
563
+ self .connection .close ()
564
+
565
+ close_thread = threading .Thread (target = closer )
566
+ close_thread .start ()
567
+
568
+ with self .assertRaises (ConnectionClosedOK ) as raised :
569
+ self .connection .recv ()
570
+
571
+ exc = raised .exception
572
+ self .assertEqual (str (exc ), "sent 1000 (OK); then received 1000 (OK)" )
573
+ self .assertIsNone (exc .__cause__ )
574
+
575
+ close_thread .join ()
576
+
550
577
def test_close_during_send (self ):
551
578
"""close fails the connection when called concurrently with send."""
552
579
close_gate = threading .Event ()
@@ -599,42 +626,45 @@ def test_ping_explicit_binary(self):
599
626
self .connection .ping (b"ping" )
600
627
self .assertFrameSent (Frame (Opcode .PING , b"ping" ))
601
628
602
- def test_ping_duplicate_payload (self ):
603
- """ping rejects the same payload until receiving the pong."""
604
- with self .remote_connection .protocol_mutex : # block response to ping
605
- pong_waiter = self .connection .ping ("idem" )
606
- with self .assertRaises (RuntimeError ) as raised :
607
- self .connection .ping ("idem" )
608
- self .assertEqual (
609
- str (raised .exception ),
610
- "already waiting for a pong with the same data" ,
611
- )
612
- self .assertTrue (pong_waiter .wait (MS ))
613
- self .connection .ping ("idem" ) # doesn't raise an exception
614
-
615
629
def test_acknowledge_ping (self ):
616
630
"""ping is acknowledged by a pong with the same payload."""
617
- with self .drop_frames_rcvd ():
631
+ with self .drop_frames_rcvd (): # drop automatic response to ping
618
632
pong_waiter = self .connection .ping ("this" )
619
- self .assertFalse (pong_waiter .wait (MS ))
620
633
self .remote_connection .pong ("this" )
621
634
self .assertTrue (pong_waiter .wait (MS ))
622
635
623
636
def test_acknowledge_ping_non_matching_pong (self ):
624
637
"""ping isn't acknowledged by a pong with a different payload."""
625
- with self .drop_frames_rcvd ():
638
+ with self .drop_frames_rcvd (): # drop automatic response to ping
626
639
pong_waiter = self .connection .ping ("this" )
627
640
self .remote_connection .pong ("that" )
628
641
self .assertFalse (pong_waiter .wait (MS ))
629
642
630
643
def test_acknowledge_previous_ping (self ):
631
644
"""ping is acknowledged by a pong with the same payload as a later ping."""
632
- with self .drop_frames_rcvd ():
645
+ with self .drop_frames_rcvd (): # drop automatic response to ping
633
646
pong_waiter = self .connection .ping ("this" )
634
647
self .connection .ping ("that" )
635
648
self .remote_connection .pong ("that" )
636
649
self .assertTrue (pong_waiter .wait (MS ))
637
650
651
+ def test_ping_duplicate_payload (self ):
652
+ """ping rejects the same payload until receiving the pong."""
653
+ with self .drop_frames_rcvd (): # drop response to ping
654
+ pong_waiter = self .connection .ping ("idem" )
655
+
656
+ with self .assertRaises (RuntimeError ) as raised :
657
+ self .connection .ping ("idem" )
658
+ self .assertEqual (
659
+ str (raised .exception ),
660
+ "already waiting for a pong with the same data" ,
661
+ )
662
+
663
+ self .remote_connection .pong ("idem" )
664
+ self .assertTrue (pong_waiter .wait (MS ))
665
+
666
+ self .connection .ping ("idem" ) # doesn't raise an exception
667
+
638
668
# Test pong.
639
669
640
670
def test_pong (self ):
0 commit comments