@@ -107,19 +107,29 @@ def parseargs():
107
107
parser .add_option ('-v' , '--verbose' , action = 'store_true' )
108
108
parser .add_option ('-s' , '--schema' , type = 'string' )
109
109
# TODO: perform check just for one/multiple schemas
110
- # TODO: perform check for multiple databases
110
+ parser . add_option ( '-A' , '--all' , action = 'store_true' )
111
111
parser .add_option ('-d' , '--database' , type = 'string' )
112
112
(options , args ) = parser .parse_args ()
113
113
114
114
USER = os .getenv ('USER' )
115
115
if USER is None or USER is ' ' :
116
- logger .error ('USER environment variable must be set. ' )
116
+ logger .error ('USER environment variable must be set' )
117
117
parser .exit ()
118
118
119
- if options .database is None :
120
- logger .error ('Database must be specified. ' )
119
+ if options .database is None and options . all is None :
120
+ logger .error ('Database must be specified' )
121
121
parser .exit ()
122
122
123
+ if options .database is not None and options .all is not None :
124
+ logger .info ('Can\' t specify -A and -d options at the same time' )
125
+ parser .exit ()
126
+
127
+ if options .database is not None :
128
+ logger .info ("Checking integrity of database %s" % options .database )
129
+
130
+ if options .all is not None :
131
+ logger .info ("Checking integrity of all databases" )
132
+
123
133
return options
124
134
125
135
@@ -171,6 +181,27 @@ def get_gp_segment_configuration(database=None):
171
181
return cfg
172
182
173
183
184
+ def get_databases ():
185
+ """
186
+ This function returns the list of databases present in the Greenplum cluster.
187
+ :return: list of databases
188
+ """
189
+ db = connect (database = 'template1' )
190
+ database_list = []
191
+
192
+ # Retrieve all non-catalog/non partitioned parent tables
193
+ qry = '''
194
+ SELECT datname
195
+ FROM pg_database
196
+ where datname not like 'template0'
197
+ '''
198
+ curs = db .query (qry )
199
+ for row in curs .dictresult ():
200
+ database_list .append (row )
201
+ db .close ()
202
+ return database_list
203
+
204
+
174
205
def get_tables (database = None ):
175
206
db = connect (database = database )
176
207
table_list = []
@@ -220,6 +251,29 @@ def get_tables_in_schema(database, schema):
220
251
return table_list
221
252
222
253
254
+ def spawn_threads (database , schema = None ):
255
+ dbids = get_gp_segment_configuration () # get Greenplum segment information
256
+ tables = list ()
257
+
258
+ if schema is not None :
259
+ tables = get_tables_in_schema (database , schema )
260
+ logger .info ("Checking only tables in schema %s" % schema )
261
+ else :
262
+ tables = get_tables (database = database ) # get table list
263
+
264
+ threads = []
265
+
266
+ for dbid in dbids :
267
+ if dbids [dbid ]['isprimary' ] == 't' :
268
+ th = CheckIntegrity (tables , dbids [dbid ]['hostname' ], database , dbids [dbid ]['content' ], dbids [dbid ]['port' ])
269
+ th .start ()
270
+ threads .append (th )
271
+
272
+ for thread in threads :
273
+ logger .debug ('waiting on thread %s' % thread .getName ())
274
+ thread .join ()
275
+
276
+
223
277
class CheckIntegrity (Thread ):
224
278
def __init__ (self , tables , hostname , database , content , port ):
225
279
Thread .__init__ (self )
@@ -242,11 +296,11 @@ class CheckIntegrity(Thread):
242
296
except DatabaseError , de :
243
297
# TODO: better error summary report
244
298
logger .error ('Failed for table %s.%s at seg%s' % (table ['schema' ], table ['table' ], self .content ))
245
- logger .error ('ERROR: %s' % str (de ).strip ())
299
+ logger .error ('%s' % str (de ).strip ())
246
300
247
301
# Append this table name to reported_table list
248
302
table_lock .acquire ()
249
- reported_tables .append ("%s .%s in %s:%d gpseg%s" % (table ['schema' ], table ['table' ],
303
+ reported_tables .append ("[%s] %s .%s in %s:%d gpseg%s" % (self . database , table ['schema' ], table ['table' ],
250
304
self .hostname , self .port , self .content ))
251
305
table_lock .release ()
252
306
@@ -262,39 +316,26 @@ if __name__ == '__main__':
262
316
setup_tool_logging (EXECNAME , getLocalHostname (), getUserName ())
263
317
264
318
options = parseargs ()
265
- logger .info ("Checking integrity of database %s" % options .database )
266
319
267
320
if options .verbose :
268
321
enable_verbose_logging ()
269
322
270
323
try :
271
- # TODO: List number of databases/schemas/tables to be checked and prompt to continue
272
- dbids = get_gp_segment_configuration () # get Greenplum segment information
273
-
274
- tables = list ()
275
-
276
- if options .schema :
277
- tables = get_tables_in_schema (options .database , options .schema )
278
- logger .info ("Checking only tables in schema %s" % options .schema )
279
- else :
280
- tables = get_tables (database = options .database ) # get table list
281
-
282
- threads = []
283
324
reported_tables = []
284
325
table_lock = Lock ()
285
326
286
- for dbid in dbids :
287
- if dbids [ dbid ][ 'isprimary' ] == 't' :
288
- th = CheckIntegrity ( tables , dbids [ dbid ][ 'hostname' ], options . database , dbids [ dbid ][ 'content' ],
289
- dbids [ dbid ][ 'port ' ])
290
- th . start ( )
291
- threads . append ( th )
292
-
293
- for thread in threads :
294
- logger . debug ( 'waiting on thread %s' % thread . getName ())
295
- thread . join ( )
327
+ if options . all is not None :
328
+ for db in get_databases () :
329
+ # TODO: List number of databases/schemas/ tables to be checked and prompt to continue
330
+ logger . info ( "Checking database %s" % db [ 'datname ' ])
331
+ spawn_threads ( db [ 'datname' ] )
332
+ else :
333
+ if options . schema is not None :
334
+ spawn_threads ( options . database , options . schema )
335
+ else :
336
+ spawn_threads ( options . database )
296
337
297
- logger .info ("REPORT SUMMARY %s" % datetime .now ())
338
+ logger .info ("ERROR REPORT SUMMARY %s" % datetime .now ())
298
339
logger .info ("============================================" )
299
340
300
341
if len (reported_tables ) == 0 :
0 commit comments