@@ -40,101 +40,115 @@ def catchup_replica(self, master, replica):
40
40
'FROM pg_stat_replication WHERE application_name = \' %s\' '
41
41
% replica .name )
42
42
43
+ def printlog (self , logfile ):
44
+ with open (logfile , 'r' ) as log :
45
+ for line in log .readlines ():
46
+ print line
47
+
43
48
def test_concurrent (self ):
44
49
"""Tests concurrent partitioning"""
45
50
node = get_new_node ('test' )
46
- node .init ()
47
- node .append_conf ('postgresql.conf' , 'shared_preload_libraries=\' pg_pathman\' \n ' )
48
- node .start ()
49
- self .init_test_data (node )
50
-
51
- node .psql ('postgres' , 'select partition_table_concurrently(\' abc\' )' )
52
-
53
- while True :
54
- # update some rows to check for deadlocks
55
- # import ipdb; ipdb.set_trace()
56
- node .safe_psql ('postgres' ,
57
- '''
58
- update abc set t = 'test'
59
- where id in (select (random() * 300000)::int from generate_series(1, 3000))
60
- ''' )
61
-
62
- count = node .execute ('postgres' , 'select count(*) from pathman_concurrent_part_tasks' )
63
-
64
- # if there is no active workers then it means work is done
65
- if count [0 ][0 ] == 0 :
66
- break
67
- time .sleep (1 )
68
-
69
- data = node .execute ('postgres' , 'select count(*) from only abc' )
70
- self .assertEqual (data [0 ][0 ], 0 )
71
- data = node .execute ('postgres' , 'select count(*) from abc' )
72
- self .assertEqual (data [0 ][0 ], 300000 )
73
-
74
- node .stop ()
51
+ try :
52
+ node .init ()
53
+ node .append_conf ('postgresql.conf' , 'shared_preload_libraries=\' pg_pathman\' \n ' )
54
+ node .start ()
55
+ self .init_test_data (node )
56
+
57
+ node .psql ('postgres' , 'select partition_table_concurrently(\' abc\' )' )
58
+
59
+ while True :
60
+ # update some rows to check for deadlocks
61
+ # import ipdb; ipdb.set_trace()
62
+ node .safe_psql ('postgres' ,
63
+ '''
64
+ update abc set t = 'test'
65
+ where id in (select (random() * 300000)::int from generate_series(1, 3000))
66
+ ''' )
67
+
68
+ count = node .execute ('postgres' , 'select count(*) from pathman_concurrent_part_tasks' )
69
+
70
+ # if there is no active workers then it means work is done
71
+ if count [0 ][0 ] == 0 :
72
+ break
73
+ time .sleep (1 )
74
+
75
+ data = node .execute ('postgres' , 'select count(*) from only abc' )
76
+ self .assertEqual (data [0 ][0 ], 0 )
77
+ data = node .execute ('postgres' , 'select count(*) from abc' )
78
+ self .assertEqual (data [0 ][0 ], 300000 )
79
+
80
+ node .stop ()
81
+ except Exception , e :
82
+ self .printlog (node .logs_dir + '/postgresql.log' )
83
+ raise e
75
84
76
85
def test_replication (self ):
77
86
"""Tests how pg_pathman works with replication"""
78
87
node = get_new_node ('master' )
79
88
replica = get_new_node ('repl' )
80
89
81
- # initialize master server
82
- node .init (allows_streaming = True )
83
- node .append_conf ('postgresql.conf' , 'shared_preload_libraries=\' pg_pathman\' \n ' )
84
- node .start ()
85
- node .backup ('my_backup' )
86
-
87
- # initialize replica from backup
88
- replica .init_from_backup (node , 'my_backup' , has_streaming = True )
89
- replica .start ()
90
-
91
- # initialize pg_pathman extension and some test data
92
- self .init_test_data (node )
93
-
94
- # wait until replica catches up
95
- self .catchup_replica (node , replica )
96
-
97
- # check that results are equal
98
- self .assertEqual (
99
- node .psql ('postgres' , 'explain (costs off) select * from abc' ),
100
- replica .psql ('postgres' , 'explain (costs off) select * from abc' )
101
- )
102
-
103
- # enable parent and see if it is enabled in replica
104
- node .psql ('postgres' , 'select enable_parent(\' abc\' ' )
105
-
106
- self .catchup_replica (node , replica )
107
- self .assertEqual (
108
- node .psql ('postgres' , 'explain (costs off) select * from abc' ),
109
- replica .psql ('postgres' , 'explain (costs off) select * from abc' )
110
- )
111
- self .assertEqual (
112
- node .psql ('postgres' , 'select * from abc' ),
113
- replica .psql ('postgres' , 'select * from abc' )
114
- )
115
- self .assertEqual (
116
- node .execute ('postgres' , 'select count(*) from abc' )[0 ][0 ],
117
- 300000
118
- )
119
-
120
- # check that direct UPDATE in pathman_config_params invalidates
121
- # cache
122
- node .psql (
123
- 'postgres' ,
124
- 'update pathman_config_params set enable_parent = false' )
125
- self .catchup_replica (node , replica )
126
- self .assertEqual (
127
- node .psql ('postgres' , 'explain (costs off) select * from abc' ),
128
- replica .psql ('postgres' , 'explain (costs off) select * from abc' )
129
- )
130
- self .assertEqual (
131
- node .psql ('postgres' , 'select * from abc' ),
132
- replica .psql ('postgres' , 'select * from abc' )
133
- )
134
- self .assertEqual (
135
- node .execute ('postgres' , 'select count(*) from abc' )[0 ][0 ],
136
- 0
137
- )
90
+ try :
91
+ # initialize master server
92
+ node .init (allows_streaming = True )
93
+ node .append_conf ('postgresql.conf' , 'shared_preload_libraries=\' pg_pathman\' \n ' )
94
+ node .start ()
95
+ node .backup ('my_backup' )
96
+
97
+ # initialize replica from backup
98
+ replica .init_from_backup (node , 'my_backup' , has_streaming = True )
99
+ replica .start ()
100
+
101
+ # initialize pg_pathman extension and some test data
102
+ self .init_test_data (node )
103
+
104
+ # wait until replica catches up
105
+ self .catchup_replica (node , replica )
106
+
107
+ # check that results are equal
108
+ self .assertEqual (
109
+ node .psql ('postgres' , 'explain (costs off) select * from abc' ),
110
+ replica .psql ('postgres' , 'explain (costs off) select * from abc' )
111
+ )
112
+
113
+ # enable parent and see if it is enabled in replica
114
+ node .psql ('postgres' , 'select enable_parent(\' abc\' ' )
115
+
116
+ self .catchup_replica (node , replica )
117
+ self .assertEqual (
118
+ node .psql ('postgres' , 'explain (costs off) select * from abc' ),
119
+ replica .psql ('postgres' , 'explain (costs off) select * from abc' )
120
+ )
121
+ self .assertEqual (
122
+ node .psql ('postgres' , 'select * from abc' ),
123
+ replica .psql ('postgres' , 'select * from abc' )
124
+ )
125
+ self .assertEqual (
126
+ node .execute ('postgres' , 'select count(*) from abc' )[0 ][0 ],
127
+ 300000
128
+ )
129
+
130
+ # check that direct UPDATE in pathman_config_params invalidates
131
+ # cache
132
+ node .psql (
133
+ 'postgres' ,
134
+ 'update pathman_config_params set enable_parent = false' )
135
+ self .catchup_replica (node , replica )
136
+ self .assertEqual (
137
+ node .psql ('postgres' , 'explain (costs off) select * from abc' ),
138
+ replica .psql ('postgres' , 'explain (costs off) select * from abc' )
139
+ )
140
+ self .assertEqual (
141
+ node .psql ('postgres' , 'select * from abc' ),
142
+ replica .psql ('postgres' , 'select * from abc' )
143
+ )
144
+ self .assertEqual (
145
+ node .execute ('postgres' , 'select count(*) from abc' )[0 ][0 ],
146
+ 0
147
+ )
148
+ except Exception , e :
149
+ self .printlog (node .logs_dir + '/postgresql.log' )
150
+ self .printlog (replica .logs_dir + '/postgresql.log' )
151
+ raise e
138
152
139
153
def test_locks (self ):
140
154
"""Test that a session trying to create new partitions waits for other
@@ -170,61 +184,66 @@ def add_partition(node, flag, query):
170
184
171
185
# Initialize master server
172
186
node = get_new_node ('master' )
173
- node .init ()
174
- node .append_conf ('postgresql.conf' , 'shared_preload_libraries=\' pg_pathman\' \n ' )
175
- node .start ()
176
- node .safe_psql (
177
- 'postgres' ,
178
- 'create extension pg_pathman; '
179
- + 'create table abc(id serial, t text); '
180
- + 'insert into abc select generate_series(1, 100000); '
181
- + 'select create_range_partitions(\' abc\' , \' id\' , 1, 50000);'
182
- )
183
-
184
- # Start transaction that will create partition
185
- con = node .connect ()
186
- con .begin ()
187
- con .execute ('select append_range_partition(\' abc\' )' )
188
-
189
- # Start threads that suppose to add new partitions and wait some time
190
- query = [
191
- 'select prepend_range_partition(\' abc\' )' ,
192
- 'select append_range_partition(\' abc\' )' ,
193
- 'select add_range_partition(\' abc\' , 500000, 550000)' ,
194
- ]
195
- threads = []
196
- for i in range (3 ):
197
- thread = \
198
- threading .Thread (target = add_partition , args = (node , flags [i ], query [i ]))
199
- threads .append (thread )
200
- thread .start ()
201
- time .sleep (3 )
202
-
203
- # This threads should wait until current transaction finished
204
- with lock :
187
+
188
+ try :
189
+ node .init ()
190
+ node .append_conf ('postgresql.conf' , 'shared_preload_libraries=\' pg_pathman\' \n ' )
191
+ node .start ()
192
+ node .safe_psql (
193
+ 'postgres' ,
194
+ 'create extension pg_pathman; '
195
+ + 'create table abc(id serial, t text); '
196
+ + 'insert into abc select generate_series(1, 100000); '
197
+ + 'select create_range_partitions(\' abc\' , \' id\' , 1, 50000);'
198
+ )
199
+
200
+ # Start transaction that will create partition
201
+ con = node .connect ()
202
+ con .begin ()
203
+ con .execute ('select append_range_partition(\' abc\' )' )
204
+
205
+ # Start threads that suppose to add new partitions and wait some time
206
+ query = [
207
+ 'select prepend_range_partition(\' abc\' )' ,
208
+ 'select append_range_partition(\' abc\' )' ,
209
+ 'select add_range_partition(\' abc\' , 500000, 550000)' ,
210
+ ]
211
+ threads = []
205
212
for i in range (3 ):
206
- self .assertEqual (flags [i ].get (), False )
213
+ thread = \
214
+ threading .Thread (target = add_partition , args = (node , flags [i ], query [i ]))
215
+ threads .append (thread )
216
+ thread .start ()
217
+ time .sleep (3 )
207
218
208
- # Commit transaction. Since then other sessions can create partitions
209
- con .commit ()
219
+ # This threads should wait until current transaction finished
220
+ with lock :
221
+ for i in range (3 ):
222
+ self .assertEqual (flags [i ].get (), False )
210
223
211
- # Now wait until each thread finishes
212
- for i in range (3 ):
213
- threads [i ].join ()
224
+ # Commit transaction. Since then other sessions can create partitions
225
+ con .commit ()
214
226
215
- # Check flags, it should be true which means that threads are finished
216
- with lock :
227
+ # Now wait until each thread finishes
217
228
for i in range (3 ):
218
- self . assertEqual ( flags [i ].get (), True )
229
+ threads [i ].join ( )
219
230
220
- # Check that all partitions are created
221
- self .assertEqual (
222
- node .safe_psql (
223
- 'postgres' ,
224
- 'select count(*) from pg_inherits where inhparent=\' abc\' ::regclass'
225
- ),
226
- '6\n '
227
- )
231
+ # Check flags, it should be true which means that threads are finished
232
+ with lock :
233
+ for i in range (3 ):
234
+ self .assertEqual (flags [i ].get (), True )
235
+
236
+ # Check that all partitions are created
237
+ self .assertEqual (
238
+ node .safe_psql (
239
+ 'postgres' ,
240
+ 'select count(*) from pg_inherits where inhparent=\' abc\' ::regclass'
241
+ ),
242
+ '6\n '
243
+ )
244
+ except Exception , e :
245
+ self .printlog (node .logs_dir + '/postgresql.log' )
246
+ raise e
228
247
229
248
if __name__ == "__main__" :
230
249
unittest .main ()
0 commit comments