16
16
import wsproto .frame_protocol as wsframeproto
17
17
from yarl import URL
18
18
19
- from ._channel import open_channel , EndOfChannel
20
19
from .version import __version__
21
20
22
21
RECEIVE_BYTES = 4096
@@ -452,7 +451,7 @@ def __init__(self, stream, wsproto, *, path=None):
452
451
self ._reader_running = True
453
452
self ._path = path
454
453
self ._subprotocol = None
455
- self ._put_channel , self ._get_channel = open_channel (0 )
454
+ self ._send_channel , self ._recv_channel = trio . open_memory_channel (0 )
456
455
self ._pings = OrderedDict ()
457
456
# Set when the server has received a connection request event. This
458
457
# future is never set on client connections.
@@ -541,8 +540,8 @@ async def get_message(self):
541
540
if self ._close_reason :
542
541
raise ConnectionClosed (self ._close_reason )
543
542
try :
544
- message = await self ._get_channel . get ()
545
- except EndOfChannel :
543
+ message = await self ._recv_channel . receive ()
544
+ except trio . EndOfChannel :
546
545
raise ConnectionClosed (self ._close_reason ) from None
547
546
return message
548
547
@@ -601,7 +600,7 @@ async def send_message(self, message):
601
600
self ._wsproto .send_data (message )
602
601
await self ._write_pending ()
603
602
604
- def _abort_web_socket (self ):
603
+ async def _abort_web_socket (self ):
605
604
'''
606
605
If a stream is closed outside of this class, e.g. due to network
607
606
conditions or because some other code closed our stream object, then we
@@ -612,7 +611,7 @@ def _abort_web_socket(self):
612
611
if not self ._wsproto .closed :
613
612
self ._wsproto .close (close_reason )
614
613
if self ._close_reason is None :
615
- self ._close_web_socket (close_reason )
614
+ await self ._close_web_socket (close_reason )
616
615
self ._reader_running = False
617
616
# We didn't really handshake, but we want any task waiting on this event
618
617
# (e.g. self.aclose()) to resume.
@@ -643,7 +642,7 @@ async def _close_stream(self):
643
642
# This means the TCP connection is already dead.
644
643
pass
645
644
646
- def _close_web_socket (self , code , reason = None ):
645
+ async def _close_web_socket (self , code , reason = None ):
647
646
'''
648
647
Mark the WebSocket as closed. Close the message channel so that if any
649
648
tasks are suspended in get_message(), they will wake up with a
@@ -652,7 +651,7 @@ def _close_web_socket(self, code, reason=None):
652
651
self ._close_reason = CloseReason (code , reason )
653
652
exc = ConnectionClosed (self ._close_reason )
654
653
logger .debug ('conn#%d websocket closed %r' , self ._id , exc )
655
- self ._put_channel . close ()
654
+ await self ._send_channel . aclose ()
656
655
657
656
async def _get_request (self ):
658
657
'''
@@ -700,7 +699,7 @@ async def _handle_connection_closed_event(self, event):
700
699
:param event:
701
700
'''
702
701
await self ._write_pending ()
703
- self ._close_web_socket (event .code , event .reason or None )
702
+ await self ._close_web_socket (event .code , event .reason or None )
704
703
self ._close_handshake .set ()
705
704
706
705
async def _handle_connection_failed_event (self , event ):
@@ -710,7 +709,7 @@ async def _handle_connection_failed_event(self, event):
710
709
:param event:
711
710
'''
712
711
await self ._write_pending ()
713
- self ._close_web_socket (event .code , event .reason or None )
712
+ await self ._close_web_socket (event .code , event .reason or None )
714
713
await self ._close_stream ()
715
714
self ._open_handshake .set ()
716
715
self ._close_handshake .set ()
@@ -723,7 +722,7 @@ async def _handle_bytes_received_event(self, event):
723
722
'''
724
723
self ._bytes_message += event .data
725
724
if event .message_finished :
726
- await self ._put_channel . put (self ._bytes_message )
725
+ await self ._send_channel . send (self ._bytes_message )
727
726
self ._bytes_message = b''
728
727
729
728
async def _handle_text_received_event (self , event ):
@@ -734,7 +733,7 @@ async def _handle_text_received_event(self, event):
734
733
'''
735
734
self ._str_message += event .data
736
735
if event .message_finished :
737
- await self ._put_channel . put (self ._str_message )
736
+ await self ._send_channel . send (self ._str_message )
738
737
self ._str_message = ''
739
738
740
739
async def _handle_ping_received_event (self , event ):
@@ -812,15 +811,15 @@ async def _reader_task(self):
812
811
try :
813
812
data = await self ._stream .receive_some (RECEIVE_BYTES )
814
813
except (trio .BrokenResourceError , trio .ClosedResourceError ):
815
- self ._abort_web_socket ()
814
+ await self ._abort_web_socket ()
816
815
break
817
816
if len (data ) == 0 :
818
817
logger .debug ('conn#%d received zero bytes (connection closed)' ,
819
818
self ._id )
820
819
# If TCP closed before WebSocket, then record it as an abnormal
821
820
# closure.
822
821
if not self ._wsproto .closed :
823
- self ._abort_web_socket ()
822
+ await self ._abort_web_socket ()
824
823
break
825
824
else :
826
825
logger .debug ('conn#%d received %d bytes' , self ._id , len (data ))
@@ -839,7 +838,7 @@ async def _write_pending(self):
839
838
try :
840
839
await self ._stream .send_all (data )
841
840
except (trio .BrokenResourceError , trio .ClosedResourceError ):
842
- self ._abort_web_socket ()
841
+ await self ._abort_web_socket ()
843
842
raise ConnectionClosed (self ._close_reason ) from None
844
843
else :
845
844
logger .debug ('conn#%d no pending data to send' , self ._id )
0 commit comments