@@ -154,14 +154,12 @@ def default_insert_value(column)
154
154
private :default_insert_value
155
155
156
156
def build_insert_sql ( insert ) # :nodoc:
157
- if insert . skip_duplicates?
157
+ if insert . skip_duplicates? || insert . update_duplicates?
158
158
insert_all = insert . send ( :insert_all )
159
159
conflict_columns = get_conflicted_columns ( insert_all :, insert :)
160
160
161
161
# if we do not have any columns that might have conflicting values, just execute a regular insert
162
- return build_sql_for_regular_insert ( insert ) if conflict_columns . empty?
163
-
164
- make_inserts_unique ( insert_all :, conflict_columns :)
162
+ return build_sql_for_regular_insert ( insert ) if conflict_columns . flatten . empty?
165
163
166
164
primary_keys_for_insert = insert_all . primary_keys . to_set
167
165
@@ -172,39 +170,92 @@ def build_insert_sql(insert) # :nodoc:
172
170
enable_identity_insert = primary_keys_for_insert . length == 1 &&
173
171
( insert_all . primary_keys . to_set & insert . keys ) . present?
174
172
175
- sql = +""
176
- sql << "SET IDENTITY_INSERT #{ insert . model . quoted_table_name } ON;" if enable_identity_insert
177
- sql << "MERGE INTO #{ insert . model . quoted_table_name } WITH (UPDLOCK, HOLDLOCK) AS target"
178
- sql << " USING (SELECT DISTINCT * FROM (#{ insert . values_list } ) AS t1 (#{ insert . send ( :columns_list ) } )) AS source"
179
- sql << " ON (#{ conflict_columns . map do |columns |
180
- columns . map do |column |
181
- "target.#{ quote_column_name ( column ) } = source.#{ quote_column_name ( column ) } "
182
- end . join ( " AND " )
183
- end . join ( ") OR (" ) } )"
184
- sql << " WHEN NOT MATCHED BY TARGET THEN"
185
- sql << " INSERT (#{ insert . send ( :columns_list ) } ) #{ insert . values_list } "
186
- if ( returning = insert_all . returning )
187
- sql << " OUTPUT " << returning . map { |column | "INSERTED.#{ quote_column_name ( column ) } " } . join ( ", " )
173
+ # why is the "PARTITION BY" clause needed?
174
+ # in every DBMS system, insert_all / upsert_all is usually implemented with INSERT, that allows to define what happens
175
+ # when duplicates are found (SKIP OR UPDATE)
176
+ # by default rows are considered to be unique by every unique index on the table
177
+ # but since we have to use MERGE in MSSQL, which in return is a JOIN, we have to perform the "de-duplication" ourselves
178
+ # otherwise the "JOIN" clause would complain about non-unique values and being unable to JOIN the two tables
179
+ # this works easiest by using PARTITION and make sure that any record
180
+ # we are trying to insert is "the first one seen across all the potential conflicted columns"
181
+ sql = <<~SQL
182
+ #{ "SET IDENTITY_INSERT #{ insert . model . quoted_table_name } ON;" if enable_identity_insert }
183
+ MERGE INTO #{ insert . model . quoted_table_name } WITH (UPDLOCK, HOLDLOCK) AS target
184
+ USING (
185
+ SELECT *
186
+ FROM (
187
+ SELECT #{ insert . send ( :columns_list ) } , #{conflict_columns . map . with_index do |group_of_conflicted_columns , index |
188
+ <<~PARTITION_BY
189
+ ROW_NUMBER ( ) OVER (
190
+ PARTITION BY #{group_of_conflicted_columns.map { |column| quote_column_name(column) }.join(",")}
191
+ ORDER BY #{group_of_conflicted_columns.map { |column| "#{quote_column_name(column)} DESC" }.join(",")}
192
+ ) AS rn_ #{index}
193
+ PARTITION_BY
194
+ end . join ( ", " )
195
+ }
196
+ FROM ( #{insert.values_list})
197
+ AS t1 ( #{insert.send(:columns_list)})
198
+ ) AS ranked_source
199
+ WHERE #{conflict_columns.map.with_index do |group_of_conflicted_columns, index|
200
+ "rn_#{ index } = 1"
201
+ end . join ( " AND " )
202
+ }
203
+ ) AS source
204
+ ON ( #{conflict_columns.map do |columns|
205
+ columns . map do |column |
206
+ "target.#{ quote_column_name ( column ) } = source.#{ quote_column_name ( column ) } "
207
+ end . join ( " AND " )
208
+ end . join ( ") OR (" ) } )
209
+ SQL
210
+
211
+ if insert . update_duplicates?
212
+ sql << " WHEN MATCHED THEN UPDATE SET "
213
+
214
+ if insert . raw_update_sql?
215
+ sql << insert . raw_update_sql
216
+ else
217
+ if insert . record_timestamps?
218
+ sql << insert . model . timestamp_attributes_for_update_in_model . filter_map do |column_name |
219
+ if insert . send ( :touch_timestamp_attribute? , column_name )
220
+ "target.#{ quote_column_name ( column_name ) } =CASE WHEN (#{ insert . updatable_columns . map { |column | "(COALESCE(target.#{ quote_column_name ( column ) } , 'NULL') = COALESCE(source.#{ quote_column_name ( column ) } , 'NULL'))" } . join ( " AND " ) } ) THEN target.#{ quote_column_name ( column_name ) } ELSE #{ high_precision_current_timestamp } END,"
221
+ end
222
+ end . join
223
+ end
224
+
225
+ sql << insert . updatable_columns . map { |column | "target.#{ quote_column_name ( column ) } =source.#{ quote_column_name ( column ) } " } . join ( "," )
226
+ end
188
227
end
228
+ sql << " WHEN NOT MATCHED BY TARGET THEN"
229
+ sql << " INSERT (#{ insert . send ( :columns_list ) } ) VALUES (#{ insert_all . keys_including_timestamps . map { |column | "source.#{ quote_column_name ( column ) } " } . join ( ", " ) } )"
230
+ sql << build_sql_for_returning ( insert :, insert_all : insert . send ( :insert_all ) )
231
+
189
232
sql << ";"
190
233
sql << "SET IDENTITY_INSERT #{ insert . model . quoted_table_name } OFF;" if enable_identity_insert
234
+
191
235
return sql
192
236
end
193
237
194
238
build_sql_for_regular_insert ( insert )
195
239
end
196
240
197
- # MERGE executes a JOIN between our data we would like to insert and the existing data in the table
198
- # but since it is a JOIN, it requires the data in the source also to be unique (aka our values to insert)
199
- # here we modify @inserts in place of the "insert_all" object to be unique
200
- # keeping the last occurence
201
- # note that for other DBMS, this job is usually handed off to them by specifying something like
202
- # "ON DUPLICATE SKIP" or "ON DUPLICATE UPDATE"
203
- def make_inserts_unique ( insert_all :, conflict_columns :)
204
- unique_inserts = insert_all . inserts . reverse . uniq { |insert | conflict_columns . map { |column | insert [ column ] } } . reverse
205
- insert_all . instance_variable_set ( :@inserts , unique_inserts )
241
+ def build_sql_for_returning ( insert :, insert_all :)
242
+ return "" unless insert_all . returning
243
+
244
+ returning_values_sql = if insert_all . returning . is_a? ( String )
245
+ insert_all . returning
246
+ else
247
+ Array ( insert_all . returning ) . map do |attribute |
248
+ if insert . model . attribute_alias? ( attribute )
249
+ "INSERTED.#{ quote_column_name ( insert . model . attribute_alias ( attribute ) ) } AS #{ quote_column_name ( attribute ) } "
250
+ else
251
+ "INSERTED.#{ quote_column_name ( attribute ) } "
252
+ end
253
+ end . join ( "," )
254
+ end
255
+
256
+ " OUTPUT #{ returning_values_sql } "
206
257
end
207
- private :make_inserts_unique
258
+ private :build_sql_for_returning
208
259
209
260
def get_conflicted_columns ( insert_all :, insert :)
210
261
if ( unique_by = insert_all . unique_by )
@@ -223,17 +274,7 @@ def get_conflicted_columns(insert_all:, insert:)
223
274
def build_sql_for_regular_insert ( insert )
224
275
sql = "INSERT #{ insert . into } "
225
276
226
- returning = insert . send ( :insert_all ) . returning
227
-
228
- if returning
229
- returning_sql = if returning . is_a? ( String )
230
- returning
231
- else
232
- Array ( returning ) . map { |column | "INSERTED.#{ quote_column_name ( column ) } " } . join ( ", " )
233
- end
234
- sql << " OUTPUT #{ returning_sql } "
235
- end
236
-
277
+ sql << build_sql_for_returning ( insert :, insert_all : insert . send ( :insert_all ) )
237
278
sql << " #{ insert . values_list } "
238
279
sql
239
280
end
0 commit comments