@@ -52,6 +52,7 @@ def test_concurrent(self):
52
52
53
53
while True :
54
54
# update some rows to check for deadlocks
55
+ # import ipdb; ipdb.set_trace()
55
56
node .safe_psql ('postgres' ,
56
57
'''
57
58
update abc set t = 'test'
@@ -135,5 +136,95 @@ def test_replication(self):
135
136
0
136
137
)
137
138
139
+ def test_locks (self ):
140
+ """Test that a session trying to create new partitions waits for other
141
+ sessions if they doing the same"""
142
+
143
+ import threading
144
+ import time
145
+
146
+ class Flag :
147
+ def __init__ (self , value ):
148
+ self .flag = value
149
+
150
+ def set (self , value ):
151
+ self .flag = value
152
+
153
+ def get (self ):
154
+ return self .flag
155
+
156
+ # There is one flag for each thread which shows if thread have done
157
+ # its work
158
+ flags = [Flag (False ) for i in xrange (3 )]
159
+
160
+ # All threads synchronizes though this lock
161
+ lock = threading .Lock ()
162
+
163
+ # Define thread function
164
+ def add_partition (node , flag , query ):
165
+ """ We expect that this query will wait until another session
166
+ commits or rolls back"""
167
+ node .safe_psql ('postgres' , query )
168
+ with lock :
169
+ flag .set (True )
170
+
171
+ # Initialize master server
172
+ node = get_new_node ('master' )
173
+ node .init ()
174
+ node .append_conf ('postgresql.conf' , 'shared_preload_libraries=\' pg_pathman\' \n ' )
175
+ node .start ()
176
+ node .safe_psql (
177
+ 'postgres' ,
178
+ 'create extension pg_pathman; '
179
+ + 'create table abc(id serial, t text); '
180
+ + 'insert into abc select generate_series(1, 100000); '
181
+ + 'select create_range_partitions(\' abc\' , \' id\' , 1, 50000);'
182
+ )
183
+
184
+ # Start transaction that will create partition
185
+ con = node .connect ()
186
+ con .begin ()
187
+ con .execute ('select append_range_partition(\' abc\' )' )
188
+
189
+ # Start threads that suppose to add new partitions and wait some time
190
+ query = [
191
+ 'select prepend_range_partition(\' abc\' )' ,
192
+ 'select append_range_partition(\' abc\' )' ,
193
+ 'select add_range_partition(\' abc\' , 500000, 550000)' ,
194
+ ]
195
+ threads = []
196
+ for i in range (3 ):
197
+ thread = \
198
+ threading .Thread (target = add_partition , args = (node , flags [i ], query [i ]))
199
+ threads .append (thread )
200
+ thread .start ()
201
+ time .sleep (3 )
202
+
203
+ # This threads should wait until current transaction finished
204
+ with lock :
205
+ for i in range (3 ):
206
+ self .assertEqual (flags [i ].get (), False )
207
+
208
+ # Commit transaction. Since then other sessions can create partitions
209
+ con .commit ()
210
+
211
+ # Now wait until each thread finishes
212
+ for i in range (3 ):
213
+ threads [i ].join ()
214
+
215
+ # Check flags, it should be true which means that threads are finished
216
+ with lock :
217
+ for i in range (3 ):
218
+ self .assertEqual (flags [i ].get (), True )
219
+
220
+ # Check that all partitions are created
221
+ self .assertEqual (
222
+ node .safe_psql (
223
+ 'postgres' ,
224
+ 'select count(*) from pg_inherits where inhparent=\' abc\' ::regclass'
225
+ ),
226
+ '6\n '
227
+ )
228
+
138
229
if __name__ == "__main__" :
139
230
unittest .main ()
0 commit comments