Skip to content

Commit d085d2c

Browse files
committed
Warn when there are active listeners on a connection that is released
Closes: MagicStack#190.
1 parent 089ac81 commit d085d2c

File tree

5 files changed

+82
-6
lines changed

5 files changed

+82
-6
lines changed

asyncpg/connection.py

+24
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import asyncio
99
import collections
1010
import collections.abc
11+
import itertools
1112
import struct
1213
import time
1314
import warnings
@@ -1172,6 +1173,29 @@ def _set_proxy(self, proxy):
11721173

11731174
self._proxy = proxy
11741175

1176+
def _check_listeners(self, listeners, listener_type):
1177+
if listeners:
1178+
count = len(listeners)
1179+
1180+
w = exceptions.InterfaceWarning(
1181+
'{conn!r} is being released to the pool but has {c} active '
1182+
'{type} listener{s}'.format(
1183+
conn=self, c=count, type=listener_type,
1184+
s='s' if count > 1 else ''))
1185+
1186+
warnings.warn(w)
1187+
1188+
def _on_release(self, stacklevel=1):
1189+
# Invalidate external references to the connection.
1190+
self._pool_release_ctr += 1
1191+
# Called when the connection is about to be released to the pool.
1192+
# Let's check that the user has not left any listeners on it.
1193+
self._check_listeners(
1194+
list(itertools.chain.from_iterable(self._listeners.values())),
1195+
'notification')
1196+
self._check_listeners(
1197+
self._log_listeners, 'log')
1198+
11751199
def _drop_local_statement_cache(self):
11761200
self._stmt_cache.clear()
11771201

asyncpg/connresource.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ class ConnectionResource:
2727

2828
def __init__(self, connection):
2929
self._connection = connection
30-
self._con_release_ctr = getattr(connection, '_pool_release_ctr', None)
30+
self._con_release_ctr = connection._pool_release_ctr
3131

3232
def _check_conn_validity(self, meth_name):
33-
con_release_ctr = getattr(self._connection, '_pool_release_ctr', None)
33+
con_release_ctr = self._connection._pool_release_ctr
3434
if con_release_ctr != self._con_release_ctr:
3535
raise exceptions.InterfaceError(
3636
'cannot call {}.{}(): '

asyncpg/exceptions/_base.py

+29-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111

1212
__all__ = ('PostgresError', 'FatalPostgresError', 'UnknownPostgresError',
13-
'InterfaceError', 'PostgresLogMessage')
13+
'InterfaceError', 'InterfaceWarning', 'PostgresLogMessage')
1414

1515

1616
def _is_asyncpg_class(cls):
@@ -159,9 +159,36 @@ class UnknownPostgresError(FatalPostgresError):
159159
"""An error with an unknown SQLSTATE code."""
160160

161161

162-
class InterfaceError(Exception):
162+
class InterfaceMessage:
163+
def __init__(self, *, detail=None, hint=None):
164+
self.detail = detail
165+
self.hint = hint
166+
167+
def __str__(self):
168+
msg = self.args[0]
169+
if self.detail:
170+
msg += '\nDETAIL: {}'.format(self.detail)
171+
if self.hint:
172+
msg += '\nHINT: {}'.format(self.hint)
173+
174+
return msg
175+
176+
177+
class InterfaceError(InterfaceMessage, Exception):
163178
"""An error caused by improper use of asyncpg API."""
164179

180+
def __init__(self, msg, *, detail=None, hint=None):
181+
InterfaceMessage.__init__(self, detail=detail, hint=hint)
182+
Exception.__init__(self, msg)
183+
184+
185+
class InterfaceWarning(InterfaceMessage, UserWarning):
186+
"""A warning caused by an improper use of asyncpg API."""
187+
188+
def __init__(self, msg, *, detail=None, hint=None):
189+
InterfaceMessage.__init__(self, detail=detail, hint=hint)
190+
Warning.__init__(self, msg)
191+
165192

166193
class PostgresLogMessage(PostgresMessage):
167194
"""A base class for non-error server messages."""

asyncpg/pool.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,7 @@ async def release(self):
175175
assert self._in_use
176176
self._in_use = False
177177

178-
# Invalidate external references to the connection.
179-
self._con._pool_release_ctr += 1
178+
self._con._on_release()
180179

181180
if self._con.is_closed():
182181
self._con = None

tests/test_listeners.py

+26
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,20 @@ def listener1(*args):
7676
await q1.get(),
7777
(con1, con2.get_server_pid(), 'ipc', 'hello'))
7878

79+
await con1.remove_listener('ipc', listener1)
80+
81+
async def test_dangling_listener_warns(self):
82+
async with self.create_pool(database='postgres') as pool:
83+
with self.assertWarnsRegex(
84+
exceptions.InterfaceWarning,
85+
'.*Connection.*is being released to the pool but '
86+
'has 1 active notification listener'):
87+
async with pool.acquire() as con:
88+
def listener1(*args):
89+
pass
90+
91+
await con.add_listener('ipc', listener1)
92+
7993

8094
class TestLogListeners(tb.ConnectedTestCase):
8195

@@ -231,3 +245,15 @@ def notice_callb(con, message):
231245
with self.assertRaises(exceptions.InvalidCharacterValueForCastError):
232246
await raise_message('', '22018')
233247
self.assertTrue(q1.empty())
248+
249+
async def test_dangling_log_listener_warns(self):
250+
async with self.create_pool(database='postgres') as pool:
251+
with self.assertWarnsRegex(
252+
exceptions.InterfaceWarning,
253+
'.*Connection.*is being released to the pool but '
254+
'has 1 active log listener'):
255+
async with pool.acquire() as con:
256+
def listener1(*args):
257+
pass
258+
259+
con.add_log_listener(listener1)

0 commit comments

Comments
 (0)