4
4
from enum import Enum
5
5
from typing import List , Optional
6
6
7
- from aioredis import Redis , ResponseError
7
+ from ... import redis
8
8
9
9
10
10
log = logging .getLogger (__name__ )
@@ -39,18 +39,19 @@ def schema_hash_key(index_name):
39
39
return f"{ index_name } :hash"
40
40
41
41
42
- async def create_index (redis : Redis , index_name , schema , current_hash ):
43
- db_number = redis .connection_pool .connection_kwargs .get ("db" )
42
+ async def create_index (conn : redis . Redis , index_name , schema , current_hash ):
43
+ db_number = conn .connection_pool .connection_kwargs .get ("db" )
44
44
if db_number and db_number > 0 :
45
45
raise MigrationError (
46
46
"Creating search indexes is only supported in database 0. "
47
47
f"You attempted to create an index in database { db_number } "
48
48
)
49
49
try :
50
- await redis .execute_command (f"ft.info { index_name } " )
51
- except ResponseError :
52
- await redis .execute_command (f"ft.create { index_name } { schema } " )
53
- await redis .set (schema_hash_key (index_name ), current_hash )
50
+ await conn .execute_command (f"ft.info { index_name } " )
51
+ except redis .ResponseError :
52
+ await conn .execute_command (f"ft.create { index_name } { schema } " )
53
+ # TODO: remove "type: ignore" when type stubs will be fixed
54
+ await conn .set (schema_hash_key (index_name ), current_hash ) # type: ignore
54
55
else :
55
56
log .info ("Index already exists, skipping. Index hash: %s" , index_name )
56
57
@@ -67,7 +68,7 @@ class IndexMigration:
67
68
schema : str
68
69
hash : str
69
70
action : MigrationAction
70
- redis : Redis
71
+ conn : redis . Redis
71
72
previous_hash : Optional [str ] = None
72
73
73
74
async def run (self ):
@@ -78,14 +79,14 @@ async def run(self):
78
79
79
80
async def create (self ):
80
81
try :
81
- await create_index (self .redis , self .index_name , self .schema , self .hash )
82
- except ResponseError :
82
+ await create_index (self .conn , self .index_name , self .schema , self .hash )
83
+ except redis . ResponseError :
83
84
log .info ("Index already exists: %s" , self .index_name )
84
85
85
86
async def drop (self ):
86
87
try :
87
- await self .redis .execute_command (f"FT.DROPINDEX { self .index_name } " )
88
- except ResponseError :
88
+ await self .conn .execute_command (f"FT.DROPINDEX { self .index_name } " )
89
+ except redis . ResponseError :
89
90
log .info ("Index does not exist: %s" , self .index_name )
90
91
91
92
@@ -105,7 +106,7 @@ async def detect_migrations(self):
105
106
106
107
for name , cls in model_registry .items ():
107
108
hash_key = schema_hash_key (cls .Meta .index_name )
108
- redis = cls .db ()
109
+ conn = cls .db ()
109
110
try :
110
111
schema = cls .redisearch_schema ()
111
112
except NotImplementedError :
@@ -114,21 +115,21 @@ async def detect_migrations(self):
114
115
current_hash = hashlib .sha1 (schema .encode ("utf-8" )).hexdigest () # nosec
115
116
116
117
try :
117
- await redis .execute_command ("ft.info" , cls .Meta .index_name )
118
- except ResponseError :
118
+ await conn .execute_command ("ft.info" , cls .Meta .index_name )
119
+ except redis . ResponseError :
119
120
self .migrations .append (
120
121
IndexMigration (
121
122
name ,
122
123
cls .Meta .index_name ,
123
124
schema ,
124
125
current_hash ,
125
126
MigrationAction .CREATE ,
126
- redis ,
127
+ conn ,
127
128
)
128
129
)
129
130
continue
130
131
131
- stored_hash = await redis .get (hash_key )
132
+ stored_hash = await conn .get (hash_key )
132
133
schema_out_of_date = current_hash != stored_hash
133
134
134
135
if schema_out_of_date :
@@ -140,7 +141,7 @@ async def detect_migrations(self):
140
141
schema ,
141
142
current_hash ,
142
143
MigrationAction .DROP ,
143
- redis ,
144
+ conn ,
144
145
stored_hash ,
145
146
)
146
147
)
@@ -151,7 +152,7 @@ async def detect_migrations(self):
151
152
schema ,
152
153
current_hash ,
153
154
MigrationAction .CREATE ,
154
- redis ,
155
+ conn ,
155
156
stored_hash ,
156
157
)
157
158
)
0 commit comments