@@ -262,7 +262,15 @@ def executemany(self: "Cursor", operation, param_sets) -> "Cursor":
262
262
self ._redshift_row_count = - 1 if - 1 in redshift_rowcounts else sum (rowcounts )
263
263
return self
264
264
265
- def insert_data_bulk (self : "Cursor" , filename , table_name , column_indexes , column_names , delimeter ) -> "Cursor" :
265
+ def insert_data_bulk (
266
+ self : "Cursor" ,
267
+ filename : str ,
268
+ table_name : str ,
269
+ parameter_indices : typing .List [int ],
270
+ column_names : typing .List [str ],
271
+ delimiter : str ,
272
+ batch_size : int = 1 ,
273
+ ) -> "Cursor" :
266
274
267
275
"""runs a single bulk insert statement into the database.
268
276
This method is native to redshift_connector.
@@ -272,40 +280,55 @@ def insert_data_bulk(self: "Cursor", filename, table_name, column_indexes, colum
272
280
The name of the table to insert to.
273
281
:param column_names:list
274
282
The name of the columns in the table to insert to.
275
- :param column_indexes:list
276
- The indexes of the columns in the table to insert to.
277
- :param delimeter: str
278
- The delimeter to use when reading the file.
283
+ :param parameter_indices:list
284
+ The indexes of the columns in the file to insert to.
285
+ :param delimiter: str
286
+ The delimiter to use when reading the file.
287
+ :param batch_size: int
288
+ The number of rows to insert per insert statement. Minimum allowed value is 1.
279
289
Returns
280
290
-------
281
291
The Cursor object used for executing the specified database operation: :class:`Cursor`
282
292
"""
293
+ if batch_size < 1 :
294
+ raise InterfaceError ("batch_size must be greater than 1" )
283
295
if not self .__is_valid_table (table_name ):
284
296
raise InterfaceError ("Invalid table name passed to insert_data_bulk: {}" .format (table_name ))
285
297
if not self .__has_valid_columns (table_name , column_names ):
286
298
raise InterfaceError ("Invalid column names passed to insert_data_bulk: {}" .format (table_name ))
287
299
orig_paramstyle = self .paramstyle
288
300
import csv
289
301
290
- if len (column_names ) != len (column_indexes ):
291
- raise InterfaceError ("Column names and indexes must be the same length" )
292
- sql_query = f"INSERT INTO { table_name } ("
293
- sql_query += ", " .join (column_names )
294
- sql_query += ") VALUES "
295
- sql_param_list_template = "(" + ", " .join (["%s" ] * len (column_indexes )) + ")"
302
+ if len (column_names ) != len (parameter_indices ):
303
+ raise InterfaceError ("Column names and parameter indexes must be the same length" )
304
+ base_stmt = f"INSERT INTO { table_name } ("
305
+ base_stmt += ", " .join (column_names )
306
+ base_stmt += ") VALUES "
307
+ sql_param_list_template = "(" + ", " .join (["%s" ] * len (parameter_indices )) + ")"
296
308
try :
297
309
with open (filename ) as csv_file :
298
- reader = csv .reader (csv_file , delimiter = delimeter )
310
+ reader = csv .reader (csv_file , delimiter = delimiter )
299
311
next (reader )
300
- values_list = []
312
+ values_list : typing . List [ str ] = []
301
313
row_count = 0
302
314
for row in reader :
303
- for column_index in column_indexes :
315
+ if row_count == batch_size :
316
+ sql_param_lists = [sql_param_list_template ] * row_count
317
+ insert_stmt = base_stmt + ", " .join (sql_param_lists ) + ";"
318
+ self .execute (insert_stmt , values_list )
319
+ row_count = 0
320
+ values_list .clear ()
321
+
322
+ for column_index in parameter_indices :
304
323
values_list .append (row [column_index ])
324
+
305
325
row_count += 1
306
- sql_param_lists = [sql_param_list_template ] * row_count
307
- sql_query += ", " .join (sql_param_lists ) + ";"
308
- self .execute (sql_query , values_list )
326
+
327
+ if row_count :
328
+ sql_param_lists = [sql_param_list_template ] * row_count
329
+ insert_stmt = base_stmt + ", " .join (sql_param_lists ) + ";"
330
+ self .execute (insert_stmt , values_list )
331
+
309
332
except Exception as e :
310
333
raise InterfaceError (e )
311
334
finally :
0 commit comments