Skip to content

Commit b04556e

Browse files
vitaly-burovoyelprans
authored andcommitted
Invalidate type cache on schema changes affecting statement result.
It is a logical continuation of the commit 749d857 Previously if a query used a composite type and it has changed then asyncpg raised either IndexError or RuntimeError with no easy way to recover from it. Unlike cases like in the mentioned commit here we can't reprepare and resend the query because we fail at the very end in attempt to create a Record from the response. It means the query is already done at the server's side. Everything we can do in such case is to invalidate caches and mark the failed prepared statement as "closed", it prevents getting it from the connection's statement cache and the next query will be reprepared. After all we have to reraise the exception to the caller. For handling schema changes a new Connection.reload_schema_state() is added. Now this method invalidates internal cached objects after changes in DB's schema. In future it can do more actions to reflect schema changes to internal representation. It is intended for being called from callbacks for NOTIFY generated from EVENT TRIGGERs or for using in migrations. The other change is replacing RuntimeError=>OutdatedSchemaCacheError when types in a composite type are not equal: 1. it is easier to catch; 2. it is easier to parse because now there is the composite type's name appear. It can be considered as affecting compatibility with previous releases.
1 parent b10e883 commit b04556e

File tree

7 files changed

+321
-6
lines changed

7 files changed

+321
-6
lines changed

asyncpg/connection.py

+56-1
Original file line numberDiff line numberDiff line change
@@ -1236,6 +1236,51 @@ def _drop_global_statement_cache(self):
12361236
else:
12371237
self._drop_local_statement_cache()
12381238

1239+
async def reload_schema_state(self):
1240+
"""Indicate that the database schema information must be reloaded.
1241+
1242+
For performance reasons, asyncpg caches certain aspects of the
1243+
database schema, such as the layout of composite types. Consequently,
1244+
when the database schema changes, and asyncpg is not able to
1245+
gracefully recover from an error caused by outdated schema
1246+
assumptions, an :exc:`~asyncpg.exceptions.OutdatedSchemaCacheError`
1247+
is raised. To prevent the exception, this method may be used to inform
1248+
asyncpg that the database schema has changed.
1249+
1250+
Example:
1251+
1252+
.. code-block:: pycon
1253+
1254+
>>> import asyncpg
1255+
>>> import asyncio
1256+
>>> async def change_type(con):
1257+
... result = await con.fetch('SELECT id, info FROM tbl')
1258+
... # Change composite's attribute type "int"=>"text"
1259+
... await con.execute('ALTER TYPE custom DROP ATTRIBUTE y')
1260+
... await con.execute('ALTER TYPE custom ADD ATTRIBUTE y text')
1261+
... await con.reload_schema_state()
1262+
... for id_, info in result:
1263+
... new = (info['x'], str(info['y']))
1264+
... await con.execute(
1265+
... 'UPDATE tbl SET info=$2 WHERE id=$1', id_, new)
1266+
...
1267+
>>> async def run():
1268+
... # Initial schema:
1269+
... # CREATE TYPE custom AS (x int, y int);
1270+
... # CREATE TABLE tbl(id int, info custom);
1271+
... con = await asyncpg.connect(user='postgres')
1272+
... async with con.transaction():
1273+
... # Prevent concurrent changes in the table
1274+
... await con.execute('LOCK TABLE tbl')
1275+
... await change_type(con)
1276+
...
1277+
>>> asyncio.get_event_loop().run_until_complete(run())
1278+
"""
1279+
# It is enough to clear the type cache only once, not in each
1280+
# connection in the pool.
1281+
self._protocol.get_settings().clear_type_cache()
1282+
self._drop_global_statement_cache()
1283+
12391284
async def _execute(self, query, args, limit, timeout, return_status=False):
12401285
with self._stmt_exclusive_section:
12411286
result, _ = await self.__execute(
@@ -1277,6 +1322,15 @@ async def _do_execute(self, query, executor, timeout, retry=True):
12771322
after = time.monotonic()
12781323
timeout -= after - before
12791324

1325+
except exceptions.OutdatedSchemaCacheError:
1326+
# This exception is raised when we detect a difference between
1327+
# cached type's info and incoming tuple from the DB (when a type is
1328+
# changed by the ALTER TYPE).
1329+
# It is not possible to recover (the statement is already done at
1330+
# the server's side), the only way is to drop our caches and
1331+
# reraise the exception to the caller.
1332+
await self.reload_schema_state()
1333+
raise
12801334
except exceptions.InvalidCachedStatementError:
12811335
# PostgreSQL will raise an exception when it detects
12821336
# that the result type of the query has changed from
@@ -1516,7 +1570,8 @@ def get(self, query, *, promote=True):
15161570

15171571
if entry._statement.closed:
15181572
# Happens in unittests when we call `stmt._state.mark_closed()`
1519-
# manually.
1573+
# manually or when a prepared statement closes itself on type
1574+
# cache error.
15201575
self._entries.pop(query)
15211576
self._clear_entry_callback(entry)
15221577
return

asyncpg/exceptions/_base.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
__all__ = ('PostgresError', 'FatalPostgresError', 'UnknownPostgresError',
1414
'InterfaceError', 'InterfaceWarning', 'PostgresLogMessage',
15-
'InternalClientError')
15+
'InternalClientError', 'OutdatedSchemaCacheError')
1616

1717

1818
def _is_asyncpg_class(cls):
@@ -221,6 +221,16 @@ class InternalClientError(Exception):
221221
pass
222222

223223

224+
class OutdatedSchemaCacheError(InternalClientError):
225+
"""A value decoding error caused by a schema change before row fetching."""
226+
227+
def __init__(self, msg, *, schema=None, data_type=None, position=None):
228+
super().__init__(msg)
229+
self.schema_name = schema
230+
self.data_type_name = data_type
231+
self.position = position
232+
233+
224234
class PostgresLogMessage(PostgresMessage):
225235
"""A base class for non-error server messages."""
226236

asyncpg/prepared_stmt.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,17 @@ async def fetchrow(self, *args, timeout=None):
198198

199199
async def __bind_execute(self, args, limit, timeout):
200200
protocol = self._connection._protocol
201-
data, status, _ = await protocol.bind_execute(
202-
self._state, args, '', limit, True, timeout)
201+
try:
202+
data, status, _ = await protocol.bind_execute(
203+
self._state, args, '', limit, True, timeout)
204+
except exceptions.OutdatedSchemaCacheError:
205+
await self._connection.reload_schema_state()
206+
# We can not find all manually created prepared statements, so just
207+
# drop known cached ones in the `self._connection`.
208+
# Other manually created prepared statements will fail and
209+
# invalidate themselves (unfortunately, clearing caches again).
210+
self._state.mark_closed()
211+
raise
203212
self._last_status = status
204213
return data
205214

asyncpg/protocol/codecs/base.pyx

+19-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
66

77

8+
from asyncpg.exceptions import OutdatedSchemaCacheError
9+
10+
811
cdef void* binary_codec_map[(MAXSUPPORTEDOID + 1) * 2]
912
cdef void* text_codec_map[(MAXSUPPORTEDOID + 1) * 2]
1013
cdef dict TYPE_CODECS_CACHE = {}
@@ -190,20 +193,34 @@ cdef class Codec:
190193
FastReadBuffer elem_buf = FastReadBuffer.new()
191194

192195
elem_count = <ssize_t><uint32_t>hton.unpack_int32(buf.read(4))
196+
if elem_count != len(self.element_type_oids):
197+
raise OutdatedSchemaCacheError(
198+
'unexpected number of attributes of composite type: '
199+
'{}, expected {}'
200+
.format(
201+
elem_count,
202+
len(self.element_type_oids),
203+
),
204+
schema=self.schema,
205+
data_type=self.name,
206+
)
193207
result = record.ApgRecord_New(self.element_names, elem_count)
194208
for i in range(elem_count):
195209
elem_typ = self.element_type_oids[i]
196210
received_elem_typ = <uint32_t>hton.unpack_int32(buf.read(4))
197211

198212
if received_elem_typ != elem_typ:
199-
raise RuntimeError(
213+
raise OutdatedSchemaCacheError(
200214
'unexpected data type of composite type attribute {}: '
201215
'{!r}, expected {!r}'
202216
.format(
203217
i,
204218
TYPEMAP.get(received_elem_typ, received_elem_typ),
205219
TYPEMAP.get(elem_typ, elem_typ)
206-
)
220+
),
221+
schema=self.schema,
222+
data_type=self.name,
223+
position=i,
207224
)
208225

209226
elem_len = hton.unpack_int32(buf.read(4))

asyncpg/protocol/settings.pxd

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ cdef class ConnectionSettings:
2222
decoder, format)
2323
cpdef inline remove_python_codec(
2424
self, typeoid, typename, typeschema)
25+
cpdef inline clear_type_cache(self)
2526
cpdef inline set_builtin_type_codec(
2627
self, typeoid, typename, typeschema, typekind, alias_to)
2728
cpdef inline Codec get_data_codec(self, uint32_t oid, ServerDataFormat format=*)

asyncpg/protocol/settings.pyx

+3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ cdef class ConnectionSettings:
6060
cpdef inline remove_python_codec(self, typeoid, typename, typeschema):
6161
self._data_codecs.remove_python_codec(typeoid, typename, typeschema)
6262

63+
cpdef inline clear_type_cache(self):
64+
self._data_codecs.clear_type_cache()
65+
6366
cpdef inline set_builtin_type_codec(self, typeoid, typename, typeschema,
6467
typekind, alias_to):
6568
self._data_codecs.set_builtin_type_codec(typeoid, typename, typeschema,

0 commit comments

Comments
 (0)