From 07dc49bf301c5f60396a73d0998df226a304f485 Mon Sep 17 00:00:00 2001 From: Just van den Broecke Date: Mon, 22 Jan 2018 11:43:54 +0100 Subject: [PATCH] #65 replace DELETE by UPDATE for PostgresInsertOutput --- stetl/outputs/dboutput.py | 46 +++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/stetl/outputs/dboutput.py b/stetl/outputs/dboutput.py index 0b98a99..d123e5a 100644 --- a/stetl/outputs/dboutput.py +++ b/stetl/outputs/dboutput.py @@ -88,16 +88,17 @@ def write(self, packet): class PostgresInsertOutput(PostgresDbOutput): """ - Output by inserting single record into Postgres database. - Input is a record (Python dic structure) or a Python list of dicts (records). + Output by inserting a single record in a Postgres database table. + Input is a Stetl record (Python dict structure) or a list of records. Creates an INSERT for Postgres to insert each single record. When the "replace" parameter is True, any existing record keyed by "key" is - attempted to be deleted first. + attempted to be UPDATEd first. - NB a constraint is that each record needs to contain all values as - an INSERT query is built once for the columns in the first record. + NB a constraint is that the first and each subsequent each record needs to contain + all values as an INSERT and UPDATE query template is built once for the columns + in the first record. - consumes=FORMAT.record + consumes=[FORMAT.record_array, FORMAT.record] """ # Start attribute config meta @@ -127,6 +128,7 @@ def key(self): def __init__(self, configdict, section, consumes=FORMAT.record): DbOutput.__init__(self, configdict, section, consumes=[FORMAT.record_array, FORMAT.record]) self.query = None + self.update_query = None self.db = None def init(self): @@ -149,14 +151,31 @@ def create_query(self, record): log.info('query is %s', query) return query + def create_update_query(self, record): + # We assume that all records do the same UPDATE key/values + # https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/6527838#6527838 + # e.g. UPDATE table SET field='C', field2='Z' WHERE id=3; + query = "UPDATE %s SET (%s) = (%s) WHERE %s = %s" % ( + self.cfg.get('table'), ",".join(['%s ' % k for k in record]), ",".join(["%s", ] * len(record.keys())), self.key, "%s") + log.info('update query is %s', query) + return query + def insert(self, record): + res = 0 if self.replace and self.key and self.key in record: - # Try to delete (replace option) - del_query = "DELETE FROM %s WHERE %s = '%s'" % (self.cfg.get('table'), self.key, record[self.key]) - self.db.execute(del_query) - # Do insert with values from the record dict - self.db.execute(self.query, record.values()) + # Replace option: try UPDATE if existing + # https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/6527838#6527838 + values = record.values() + values.append(record[self.key]) + res = self.db.execute(self.update_query, values) + # del_query = "DELETE FROM %s WHERE %s = '%s'" % (self.cfg.get('table'), self.key, record[self.key]) + # res = self.db.execute(del_query) + + if res < 1: + # Do insert with values from the record dict + # only if we did not do an UPDATE (res==0) on existing record. + self.db.execute(self.query, record.values()) self.db.commit(close=False) def write(self, packet): @@ -174,10 +193,13 @@ def write(self, packet): if type(record) is list and len(record) > 0: first_record = record[0] - # Create query once + # Create INSERT and optional UPDATE query-templates once if self.query is None: self.query = self.create_query(first_record) + if self.replace and self.key and not self.update_query: + self.update_query = self.create_update_query(first_record) + # Check if record is single (dict) or array (list of dict) if type(record) is dict: # Do insert with values from the single record