Skip to content

Commit 9a71be1

Browse files
authored
PYTHON-4740 Convert asyncio.TimeoutError to socket.timeout for compat (#1864)
1 parent c136684 commit 9a71be1

File tree

5 files changed

+59
-71
lines changed

5 files changed

+59
-71
lines changed

pymongo/asynchronous/bulk.py

-4
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,6 @@ async def write_command(
313313
if isinstance(exc, (NotPrimaryError, OperationFailure)):
314314
await client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
315315
raise
316-
finally:
317-
bwc.start_time = datetime.datetime.now()
318316
return reply # type: ignore[return-value]
319317

320318
async def unack_write(
@@ -403,8 +401,6 @@ async def unack_write(
403401
assert bwc.start_time is not None
404402
bwc._fail(request_id, failure, duration)
405403
raise
406-
finally:
407-
bwc.start_time = datetime.datetime.now()
408404
return result # type: ignore[return-value]
409405

410406
async def _execute_batch_unack(

pymongo/asynchronous/client_bulk.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,6 @@ async def write_command(
319319
await self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
320320
else:
321321
await self.client._process_response({}, bwc.session) # type: ignore[arg-type]
322-
finally:
323-
bwc.start_time = datetime.datetime.now()
324322
return reply # type: ignore[return-value]
325323

326324
async def unack_write(
@@ -410,9 +408,7 @@ async def unack_write(
410408
bwc._fail(request_id, failure, duration)
411409
# Top-level error will be embedded in ClientBulkWriteException.
412410
reply = {"error": exc}
413-
finally:
414-
bwc.start_time = datetime.datetime.now()
415-
return result # type: ignore[return-value]
411+
return reply
416412

417413
async def _execute_batch_unack(
418414
self,

pymongo/network_layer.py

+57-53
Original file line numberDiff line numberDiff line change
@@ -64,65 +64,69 @@ async def async_sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> Non
6464
loop = asyncio.get_event_loop()
6565
try:
6666
if _HAVE_SSL and isinstance(sock, (SSLSocket, _sslConn)):
67-
if sys.platform == "win32":
68-
await asyncio.wait_for(_async_sendall_ssl_windows(sock, buf), timeout=timeout)
69-
else:
70-
await asyncio.wait_for(_async_sendall_ssl(sock, buf, loop), timeout=timeout)
67+
await asyncio.wait_for(_async_sendall_ssl(sock, buf, loop), timeout=timeout)
7168
else:
7269
await asyncio.wait_for(loop.sock_sendall(sock, buf), timeout=timeout) # type: ignore[arg-type]
70+
except asyncio.TimeoutError as exc:
71+
# Convert the asyncio.wait_for timeout error to socket.timeout which pool.py understands.
72+
raise socket.timeout("timed out") from exc
7373
finally:
7474
sock.settimeout(timeout)
7575

7676

77-
async def _async_sendall_ssl(
78-
sock: Union[socket.socket, _sslConn], buf: bytes, loop: AbstractEventLoop
79-
) -> None:
80-
view = memoryview(buf)
81-
fd = sock.fileno()
82-
sent = 0
83-
84-
def _is_ready(fut: Future) -> None:
85-
loop.remove_writer(fd)
86-
loop.remove_reader(fd)
87-
if fut.done():
88-
return
89-
fut.set_result(None)
90-
91-
while sent < len(buf):
92-
try:
93-
sent += sock.send(view[sent:])
94-
except BLOCKING_IO_ERRORS as exc:
95-
fd = sock.fileno()
96-
# Check for closed socket.
97-
if fd == -1:
98-
raise SSLError("Underlying socket has been closed") from None
99-
if isinstance(exc, BLOCKING_IO_READ_ERROR):
100-
fut = loop.create_future()
101-
loop.add_reader(fd, _is_ready, fut)
102-
await fut
103-
if isinstance(exc, BLOCKING_IO_WRITE_ERROR):
104-
fut = loop.create_future()
105-
loop.add_writer(fd, _is_ready, fut)
106-
await fut
107-
if _HAVE_PYOPENSSL and isinstance(exc, BLOCKING_IO_LOOKUP_ERROR):
108-
fut = loop.create_future()
109-
loop.add_reader(fd, _is_ready, fut)
110-
loop.add_writer(fd, _is_ready, fut)
111-
await fut
112-
113-
114-
# The default Windows asyncio event loop does not support loop.add_reader/add_writer: https://docs.python.org/3/library/asyncio-platforms.html#asyncio-platform-support
115-
async def _async_sendall_ssl_windows(sock: Union[socket.socket, _sslConn], buf: bytes) -> None:
116-
view = memoryview(buf)
117-
total_length = len(buf)
118-
total_sent = 0
119-
while total_sent < total_length:
120-
try:
121-
sent = sock.send(view[total_sent:])
122-
except BLOCKING_IO_ERRORS:
123-
await asyncio.sleep(0.5)
124-
sent = 0
125-
total_sent += sent
77+
if sys.platform != "win32":
78+
79+
async def _async_sendall_ssl(
80+
sock: Union[socket.socket, _sslConn], buf: bytes, loop: AbstractEventLoop
81+
) -> None:
82+
view = memoryview(buf)
83+
fd = sock.fileno()
84+
sent = 0
85+
86+
def _is_ready(fut: Future) -> None:
87+
loop.remove_writer(fd)
88+
loop.remove_reader(fd)
89+
if fut.done():
90+
return
91+
fut.set_result(None)
92+
93+
while sent < len(buf):
94+
try:
95+
sent += sock.send(view[sent:])
96+
except BLOCKING_IO_ERRORS as exc:
97+
fd = sock.fileno()
98+
# Check for closed socket.
99+
if fd == -1:
100+
raise SSLError("Underlying socket has been closed") from None
101+
if isinstance(exc, BLOCKING_IO_READ_ERROR):
102+
fut = loop.create_future()
103+
loop.add_reader(fd, _is_ready, fut)
104+
await fut
105+
if isinstance(exc, BLOCKING_IO_WRITE_ERROR):
106+
fut = loop.create_future()
107+
loop.add_writer(fd, _is_ready, fut)
108+
await fut
109+
if _HAVE_PYOPENSSL and isinstance(exc, BLOCKING_IO_LOOKUP_ERROR):
110+
fut = loop.create_future()
111+
loop.add_reader(fd, _is_ready, fut)
112+
loop.add_writer(fd, _is_ready, fut)
113+
await fut
114+
else:
115+
# The default Windows asyncio event loop does not support loop.add_reader/add_writer:
116+
# https://docs.python.org/3/library/asyncio-platforms.html#asyncio-platform-support
117+
async def _async_sendall_ssl(
118+
sock: Union[socket.socket, _sslConn], buf: bytes, dummy: AbstractEventLoop
119+
) -> None:
120+
view = memoryview(buf)
121+
total_length = len(buf)
122+
total_sent = 0
123+
while total_sent < total_length:
124+
try:
125+
sent = sock.send(view[total_sent:])
126+
except BLOCKING_IO_ERRORS:
127+
await asyncio.sleep(0.5)
128+
sent = 0
129+
total_sent += sent
126130

127131

128132
def sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None:

pymongo/synchronous/bulk.py

-4
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,6 @@ def write_command(
313313
if isinstance(exc, (NotPrimaryError, OperationFailure)):
314314
client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
315315
raise
316-
finally:
317-
bwc.start_time = datetime.datetime.now()
318316
return reply # type: ignore[return-value]
319317

320318
def unack_write(
@@ -403,8 +401,6 @@ def unack_write(
403401
assert bwc.start_time is not None
404402
bwc._fail(request_id, failure, duration)
405403
raise
406-
finally:
407-
bwc.start_time = datetime.datetime.now()
408404
return result # type: ignore[return-value]
409405

410406
def _execute_batch_unack(

pymongo/synchronous/client_bulk.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,6 @@ def write_command(
319319
self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
320320
else:
321321
self.client._process_response({}, bwc.session) # type: ignore[arg-type]
322-
finally:
323-
bwc.start_time = datetime.datetime.now()
324322
return reply # type: ignore[return-value]
325323

326324
def unack_write(
@@ -410,9 +408,7 @@ def unack_write(
410408
bwc._fail(request_id, failure, duration)
411409
# Top-level error will be embedded in ClientBulkWriteException.
412410
reply = {"error": exc}
413-
finally:
414-
bwc.start_time = datetime.datetime.now()
415-
return result # type: ignore[return-value]
411+
return reply
416412

417413
def _execute_batch_unack(
418414
self,

0 commit comments

Comments
 (0)